Hi folks,
I have a streaming job that consumes from of a kafka topic. The topic is pretty active so the local-mode single worker is obviously not able to keep up with the fire-hose. I expect the job to skip records and continue on. However, I'm getting an exception from the LegacyFetcher which kills the job. This is very much *not* what I want. Any thoughts? The only thing I find when I search for this error message is a link back to FLINK-2656. I'm running roughly 0.10-release/HEAD. Thanks a lot, Nick java.lang.Exception: Found invalid offsets more than once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions: at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:399) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Found invalid offsets more than once in partitions [FetchPartition {partition=X, offset=Y}] Exceptions: at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:412) |
Hi Nick, I'm sorry you ran into the issue. Is it possible that Flink's Kafka consumer falls back in the topic so far that the offsets it's requesting are invalid? For that, the retention time of Kafka has to be pretty short. Skipping records under load is something currently not supported by Flink itself. The only idea I had for handling this would be to give the DeserializationSchema a call back to request the latest offset from Kafka to determine the lag. With that, the schema could determine a "dropping rate" to catch up. What would you as an application developer expect to handle the situation? Just out of curiosity: What's the throughput you have on the Kafka topic? On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk <[hidden email]> wrote:
|
@Robert: Is it possible to add a "fallback" strategy to the consumer? Something like "if offsets cannot be found, use latest"? I would make this an optional feature to activate. I would think it is quite surprising to users if records start being skipped in certain situations. But I can see that this would be desirable sometimes. More control over skipping the records could be something to implement in an extended version of the Kafka Consumer. A user could define a policy that, in case consumer falls behind producer more than X (offsets), it starts requesting the latest offsets (rather than the following), thereby skipping a bunch of records. On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger <[hidden email]> wrote:
|
This goes back to the idea that streaming applications should never go down. I'd much rather consume at max capacity and knowingly drop some portion of the incoming pipe than have the streaming job crash. Of course, once the job itself is robust, I still need the runtime to be robust -- YARN vs (potential) Mesos vs standalone cluster will be my next consideration. I can share some details about my setup, but not at this time; in part because I don't have my metrics available at the moment and in part because this is a public, archived list. On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen <[hidden email]> wrote:
|
Hi Nick! I agree, real time streams should never go down. Whether you want to allow the stream processor to temporarily fall behind (back pressure on an event spike) and catch up a bit later, or whether you want to be always at the edge of real time and drop messages, is use case specific. Both should be supported. Since we interpret streaming very broadly (also including analysis of historic streams or timely data), the "backpressure/catch-up" mode seemed natural as the first one to implement. The "load shedding" variant can probably even be realized in the Kafka consumer, without complex modifications to the core Flink runtime itself. Greetings, Stephan On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk <[hidden email]> wrote:
|
On Sunday, January 17, 2016, Stephan Ewen <[hidden email]> wrote:
Glad to hear that :)
Agreed.
Indeed, this is what my job is doing. I have set it to, lacking a valid offset, start from the beginning. I have to presume that in my case the stream data is expiring faster than my consumers can keep up. However I haven't investigated proper monitoring yet.
|
Hey Nick, I had a discussion with Stephan Ewen on how we could resolve the issue. I filed a JIRA with our suggested approach: https://issues.apache.org/jira/browse/FLINK-3264 By handling this directly in the KafkaConsumer, we would avoid fetching data we can not handle anyways (discarding in the deserialization schema would be more inefficient). Let us know what you think about our suggested approach. Sadly, it seems that the Kafka 0.9 consumer API does not yet support requesting the latest offset of a TopicPartition. I'll ask about this on their ML. On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk <[hidden email]> wrote: On Sunday, January 17, 2016, Stephan Ewen <[hidden email]> wrote: |
For what it's worth, I dug into the TM logs and found that this exception was not the root cause, merely a symptom of other backpressure building in the flow (actually, lock contention in another part of the stack). While Flink was helpful in finding and bubbling up this stack to the UI, it was ultimately missleading, caused me to overlook proper evaluation of the failure. On Wed, Jan 20, 2016 at 2:59 AM, Robert Metzger <[hidden email]> wrote:
|
Was the contended lock part of Flink's runtime, or the application code? If it was part of the Flink Runtime, can you share what you found? On Thu, Feb 25, 2016 at 6:03 PM, Nick Dimiduk <[hidden email]> wrote:
|
Sorry I wasn't clear. No, the lock contention is not in Flink.
On Friday, February 26, 2016, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |