KeyedStream and chained forward operators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

KeyedStream and chained forward operators

Cliff Resnick
I'm running a massive file sifting by timestamp DataSteam job from s3. 

The basic job is:
FileMonitor -> ContinuousFileReader -> MultipleFileOutputSink 

The MultipleFileOutputSink sifts data based on timestamp to date-hour directories

It's a lot of data, so I'm using high parallelism, but I want to maintain reasonable output file size, so if I key post-ContinuousFileReader by 5-minute timestamp keys I get the desired result of large files at the cost of a network shuffle.

But since I also have timestamps on the input files I figured I could push back the keyed stream to FileMonitor -> ContinuousFileReader and save the network shuffle. I tested this and confirmed that it sort of worked and ContinuousFileReaders are receiving properly partitioned input, but output post reader is now rebalanced and sinks produce lots of tiny files. 

The code is below. Am I missing something?
val source = env
.addSource(fileMonitor)
.name(s"Bucketed Log Source File Watcher: $path")
.keyBy(new KeySelector[TimestampedFileInputSplit, Long]() {
override def getKey(split: TimestampedFileInputSplit): Long = {
val name = split.getPath.getName
val r = """(\d+)\.log""".r
r.findFirstMatchIn(name) match {
case Some(m) ⇒ {
val t = m.group(1).toLong
t - (t % 300)
}
case _ ⇒ -1
}
}
})
.transform[String]("Bucketed Log Source File Reader", fileReader)
.forward
.assignTimestampsAndWatermarks(WatermarkExtractor[String])
.forward
.addSink(SourceTrackingSink(Sift.outputBucket, BidDateFunc))



Reply | Threaded
Open this post in threaded view
|

Re: KeyedStream and chained forward operators

Piotr Nowojski-3
Hi,

I’m not sure how can we help you here. To my eye, your code looks ok, what you figured about pushing the keyBy in front of ContinuousFileReader is also valid and makes sense if you indeed can correctly perform the keyBy based on the input splits. The problem should be somewhere in your custom logic, maybe your KeySelector is not working exactly as expected? Maybe bucketing is mis behaving? I would suggest you to reproduce the problem locally (not on cluster) with some minimal data set and then decompose your job into a smaller components and validate them independently, or to bisect the job to pin point where is the problem.

I’m not sure if you are aware of it, but just in case not, please take a look at the ways how you can test your job [1]
 
Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

On 21 Apr 2020, at 17:35, Cliff Resnick <[hidden email]> wrote:

I'm running a massive file sifting by timestamp DataSteam job from s3. 

The basic job is:
FileMonitor -> ContinuousFileReader -> MultipleFileOutputSink 

The MultipleFileOutputSink sifts data based on timestamp to date-hour directories

It's a lot of data, so I'm using high parallelism, but I want to maintain reasonable output file size, so if I key post-ContinuousFileReader by 5-minute timestamp keys I get the desired result of large files at the cost of a network shuffle.

But since I also have timestamps on the input files I figured I could push back the keyed stream to FileMonitor -> ContinuousFileReader and save the network shuffle. I tested this and confirmed that it sort of worked and ContinuousFileReaders are receiving properly partitioned input, but output post reader is now rebalanced and sinks produce lots of tiny files. 

The code is below. Am I missing something?
val source = env
.addSource(fileMonitor)
.name(s"Bucketed Log Source File Watcher: $path")
.keyBy(new KeySelector[TimestampedFileInputSplit, Long]() {
override def getKey(split: TimestampedFileInputSplit): Long = {
val name = split.getPath.getName
val r = """(\d+)\.log""".r
r.findFirstMatchIn(name) match {
case Some(m) ⇒ {
val t = m.group(1).toLong
t - (t % 300)
}
case _ ⇒ -1
}
}
})
.transform[String]("Bucketed Log Source File Reader", fileReader)
.forward
.assignTimestampsAndWatermarks(WatermarkExtractor[String])
.forward
.addSink(SourceTrackingSink(Sift.outputBucket, BidDateFunc))