| 
		My Flink job is doing aggregations on top of event-time based windowing across Kafka partitions. As I have been developing and restarting it, the state for the catch-up periods becomes unreliable -- lots of duplicate emits for time windows already seen before, that I have to discard since my sink can't handle it. There may be a bug in my job, but I wanted to clarify whether this might be a flaw in Flink's handling of this.
 I understand there is m:n mapping of partitions to sources depending on the parallelism. Each source will have its own watermark. During catchup, watermark progression can become pretty fragile, e.g. in my case where there's n partitions and parallelism is 1. I feel like some kind of event time alignment is needed across partitions. I may be completely off here, so I look forward to your thoughts on this! | 
| 
		Things make more sense after coming across http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tBasYTJQ@mail.gmail.com%3E I need to ensure the parallelism is at least the number of partitions. This seems like a gotcha that could be better documented or automatically enforced. | 
 
	
					
		
	
					| 
		Hi,
 what do you mean by this? I think it should also work when setting parallelism to 1. If not, then there is either a problem with Flink or maybe something in the Data. -Aljoscha > On 08 Feb 2016, at 21:43, shikhar <[hidden email]> wrote: > > Things make more sense after coming across > http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tBasYTJQ@...%3E > > I need to ensure the parallelism is at least the number of partitions. This > seems like a gotcha that could be better documented or automatically > enforced. > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
| 
		Stephan explained in that thread that we're picking the min watermark when doing operations that join streams from multiple sources. If we have m:n partition-source assignment where m>n, the source is going to end up with the max watermark. Having m<=n ensures that the lowest watermark is used.
 Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition on a source should require opt-in, e.g. allowOversubscription() | 
 
	
					
		
	
					| 
		Hi,
 in general it should not be a problem if one parallel instance of a sink is responsible for several Kafka partitions. It can become a problem if the timestamps in the different partitions differ by a lot and the watermark assignment logic is not able to handle this. How are you assigning the timestamps/watermarks in your job? Cheers, Aljoscha > On 08 Feb 2016, at 21:51, shikhar <[hidden email]> wrote: > > Stephan explained in that thread that we're picking the min watermark when > doing operations that join streams from multiple sources. If we have m:n > partition-source assignment where m>n, the source is going to end up with > the max watermark. Having m<=n ensures that the lowest watermark is used. > > Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition > on a source should require opt-in, e.g. allowOversubscription() > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. | 
 
	
					
		
	
					| Hi,where did you observe the duplicates, within Flink or in Kafka? Flink's exactly-once guarantees are only valid within the Flink DataStream program and for some sinks such as the RollingFileSink. Cheers, Fabian 2016-02-09 10:21 GMT+01:00 Aljoscha Krettek <[hidden email]>: Hi, | 
 
	
					
		
	
					| 
				In reply to this post by shikhar
			 Hi Shikar! What you are seeing is that some streams (here the different Kafka Partitions in one source) get merged in the source task. That happens before watermarks are generated. In such a case, records are out-of-order when they arrive at the timestamp-extractor/watermark generator, and the watermark generator needs to be implemented such that it is aware of these out-of-order records, and uses some heuristic to generate watermarks. This is actually the general case that one also has if timestamps are not ascending inside a single Kafka partition. You probably want to make use of the simple case, where timestamps are ascending inside one Kafka partition, and use the ascending-timestamp-extractor that auto-generates watermarks. With Kafka, that one only works when there is 1:1 sources to partitions. I think we can add some tooling that makes it possible to use the simple ascending timestamp extraction also in cases where one parallel source task has multiple Kafka partitions. Effectively, the Kafka source has to internally generate the watermarks and use the same "watermark union" technique as for example the join operator. Here is the issue to track this: https://issues.apache.org/jira/browse/FLINK-3375 Greetings, Stephan On Mon, Feb 8, 2016 at 9:51 PM, shikhar <[hidden email]> wrote: Stephan explained in that thread that we're picking the min watermark when | 
| 
				In reply to this post by Aljoscha Krettek
			 
		I am assigning timestamps using a threshold-based extractor -- the static delta from last timestamp is probably sufficient and the PriorityQueue for allowing outliers not necessary, that is something I added while figuring out what was going on.
 The timestamps across partitions don't differ that much in normal operation when stream processing is caught up with the head of the partitions, so the thresholding works well. However, during catch-up, like if I stop for a bit & start the job again, or there is no offset in ZK and I'm using 'auto.offset.reset=smallest', the source tends to emit messages with much larger deviations, and the timestamp extraction which is not partition-aware will start providing an incorrect watermark. 
 | 
| 
				In reply to this post by Fabian Hueske-2
			 
		Hi Fabian,
 Sorry, I should have been clearer. What I meant (or now know!) by duplicate emits is that since the watermark is progressing more rapidly than the state of the offsets on some partitions due to the source multiplexing more than 1 partition, when messages from the lagging partitions are passed on to further operators specifically time-based windowing they get emitted immediately, resulting in duplicate windows (https://issues.apache.org/jira/browse/FLINK-2870). | 
 
	
					
		
	
					| 
				In reply to this post by shikhar
			 Thanks for filling us in. If the problem comes from the fact that the difference between partitions becomes high sometimes (when resetting to the smallest offset), then this could probably be solved similarly as suggested here (https://issues.apache.org/jira/browse/FLINK-3375) by running a watermark assigner (ascending, threashold / whatever) per partition inside the Kafka Source. What do you think? On Tue, Feb 9, 2016 at 3:01 PM, shikhar <[hidden email]> wrote: I am assigning timestamps using a threshold-based extractor | 
| 
		Yes that approach seems perfect Stephan, thanks for creating the JIRA!
 It is not only when resetting to smallest, I have observed uneven progress on partitions skewing the watermark any time the source is not caught up to the head of each partition it is handling, like when stopping for a few mins and starting it back up (the offsets it's resuming from are approx the same number of messages behind). | 
| 
				In reply to this post by shikhar
			 
		Hi,
 On a related and a more exaggerated setup, our kafka-producer (flume) seems to send data to a single partition at a time and switches it every few minutes. So when i run my flink datastream program for the first time, it starts on the *largest* offsets and shows something like this: . Fetched the following start offsets [FetchPartition {partition=7, offset=15118832832}] . Fetched the following start offsets [FetchPartition {partition=1, offset=15203613236}] . Fetched the following start offsets [FetchPartition {partition=2, offset=15366811664}] . Fetched the following start offsets [FetchPartition {partition=0, offset=15393999709}] . Fetched the following start offsets [FetchPartition {partition=8, offset=15319475583}] . Fetched the following start offsets [FetchPartition {partition=5, offset=15482889767}] . Fetched the following start offsets [FetchPartition {partition=6, offset=15113885928}] . Fetched the following start offsets [FetchPartition {partition=3, offset=15182701991}] . Fetched the following start offsets [FetchPartition {partition=4, offset=15186569356}] For that instance flume happens to be sending data to partition-6 only, so other consumers sit idly. Working with default paralellism 4, only one of the 4 threads is able to source data and checkpointing logs reflect that: Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, 15114275927, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER Committing offsets [-915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776, -915623761776] to offset store: FLINK_ZOOKEEPER This also means checkpoint will only contain the offset for partition-6. So if program is stopped and restarted at a later time, it restores the offset for partition-6 only and other partitions are started at the largest offset. So it's able to process unseen data in partition-6 but not others. Say if flume produces data to partition-3 when flink program is stopped, they're lost, while the data in partition-6 is not. This generally causes multiple (late-)windows to be fired after restart, because we now generate watermarks off partition-3 which says the windows of the unseen data in partition-6 are already complete. This also has a side effect of windows not triggering unless some rebalancing is done beforehand. Since only 1 of the 4 threads will source data and generate watermarks, window triggers won't get watermarks from other 3 sources and wait long past the watermarks generated from the single source. I know producers shouldn't work like that, but consumers shouldn't care. I think it may also create some edge cases even if things were not as extreme as ours. If checkpoints could contain offsets of all of the partitions regardless of their contents, probably storing start offsets in first run, i guess that would solve the problems around restarting. | 
 
	
					
		
	
					| You are right, the checkpoints should contain all offsets. I created a Ticket for this: https://issues.apache.org/jira/browse/FLINK-3440 On Thu, Feb 18, 2016 at 10:15 AM, agaoglu <[hidden email]> wrote: Hi, | 
| Thanks Stephan On Thu, Feb 18, 2016 at 3:00 PM, Stephan Ewen <[hidden email]> wrote: 
 erdem agaoglu | 
| Hi Erdem, FLINK-3440 has been resolved. The fix is merged to master. 1.0-SNAPSHOT should already contain the fix and it'll be in 1.0.0 (for which I'll post a release candidate today) as well. On Thu, Feb 18, 2016 at 3:24 PM, Erdem Agaoglu <[hidden email]> wrote: 
 | 
| Hi Robert, I switched to SNAPSHOT and confirm that it works. Thanks! On Thu, Feb 25, 2016 at 10:50 AM, Robert Metzger <[hidden email]> wrote: 
 erdem agaoglu | 
| Free forum by Nabble | Edit this page | 
 
	

 
	
	
		
