Hi,
I am pretty new to Flink, and I like what I see and have started to build my first application using it. I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded). I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer. Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them? Thanks -- Niclas Hedhman, Software Developer http://zest.apache.org - New Energy for Java |
Hi Niclas, Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.Do you produce into the topic that you want to read or is the data in the topic static? If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1]. Best, Fabian 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <[hidden email]>:
|
So, the producer is run (at the moment) manually (command-line) one message at a time. Kafka's tooling (different consumer group) shows that a message is added each time. Since my last post, I have also added a UUID as the key, and that didn't make a difference, so you are likely correct about de-dup. There is only a single partition on the topic, so it shouldn't be a partitioning issue. I also noticed; 1. Sending a message while consumer topology is running, after the first message, then that message will be processed after a restart. 2. Sending many messages, while consumer is running, and then doing many restarts will only process a single of those. No idea what happens to the others. I am utterly confused. And digging in the internals are not for the faint-hearted, but the kafka.poll() returns frequently with empty records. Will continue debugging that tomorrow... Niclas On Feb 18, 2018 18:50, "Fabian Hueske" <[hidden email]> wrote:
|
Hi Niclas,
About the second point you mentioned, was the processed message a random one or a fixed one? The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while debugging. Also, before that, you may try fetching the messages with the Kafka console consumer tool to see whether they can be consumed completely. Besides, I wonder if you could provide the code for you Flink pipeline. That’ll be helpful. Best, Xingcan
|
(Sorry for the incoherent order and ramblings. I am writing this as I am trying to sort out what is going on...) I have tried to use both the ObjectMapper from Jackson proper, as well as the shadowed ObjectMapper in flink. No difference. Recap; Positioning Kafka consumer to message 8th from the last. Only that message is consumed, the remaining 7 are ignored/swallowed. Ok, so I think I have traced this down to something happening in the CassandraSink. There is a Exception being thrown somewhere, which I see as the Kafka09Fetcher.runFetchLoop()'s finally clause is called. Found it (hours later in debugging), on this line (Flink 1.4.1) org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258 which containsthrows an Exception without stepping into the addListener() method. There is nothing catching the Exception (and I don't want to go down the rabbit hole of building from source), so I can't really say what Exception is being thrown. IDEA doesn't seem to report it, and the catch clauses in OperatorChain.pushToOperator() (ClassCastException and Exception) are in the call stack, but doesn't catch it, which could suggest an java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO MANY classloading exception going on all the time. Hold on a second... There are TWO com.datastax.driver.core.DefaultResultSetFuture types in the classpath. One from the Cassandra client that I declared, and on from inside the flink-connector-cassandra_2.11 artifact... So will it work if I remove my own dependency declaration and that's it? YEEEEESSSSS!!! FInally..... SOLVED! -o-o-o-o-o- public static void main( String[] args ) private static class GetPollDeclaration private static class PollDeclarationToTuple3Map Flink Dependencies; flink : [ On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <[hidden email]> wrote:
-- Niclas Hedhman, Software Developer http://zest.apache.org - New Energy for Java |
Hi Niclas, Glad that you got it working!2018-02-19 9:29 GMT+01:00 Niclas Hedhman <[hidden email]>:
|
Free forum by Nabble | Edit this page |