Hello,
As far as I've seen, there are a lot of projects using Flink and Kafka together, but I'm not seeing the point of that. Let me know what you think about this. 1. If I'm not wrong, Kafka provides basically two things: storage (records retention) and fault tolerance in case of failure, while Flink mostly cares about the transformation of such records. That means I can write a pipeline with Flink alone, and even distribute it on a cluster, but in case of failure some records may be lost, or I won't be able to reprocess the data if I change the code, since the records are not kept in Flink by default (only when sinked properly). Is that right? 2. In my use case the records come from a WebSocket and I create a custom class based on messages on that socket. Should I put those records inside a Kafka topic right away using a Flink custom source (SourceFunction) with a Kafka sink (FlinkKafkaProducer), and independently create a Kafka source (KafkaConsumer) for that topic and pipe the Flink transformations there? Is that data flow fine? Basically what I'm trying to understand with both question is how and why people are using Flink and Kafka. Regards, Matt |
Hi Matt, as you've stated Flink is a stream processor and as such it needs to get its inputs from somewhere. Flink can provide you up to exactly-once processing guarantees. But in order to do this, it requires a re-playable source because in case of a failure you might have to reprocess parts of the input you had already processed prior to the failure. Kafka is such a source and people use it because it happens to be one of the most popular and widespread open source message queues/distributed logs. If you don't require strong processing guarantees, then you can simply use the WebSocket source. But, for any serious use case, you probably want to have these guarantees because otherwise you just might calculate bogus results. So in your case I would directly ingest my messages into Kafka and then let Flink read from the created topic to do the processing. Cheers, Till On Tue, Nov 15, 2016 at 8:14 AM, Dromit <[hidden email]> wrote:
|
"So in your case I would directly ingest my messages into Kafka" I will do that through a custom SourceFunction that reads the messages from the WebSocket, creates simple java objects (POJOs) and sink them in a Kafka topic using a FlinkKafkaProducer, if that makes sense. The problem now is I need a DeserializationSchema for my class. I read Flink is able to de/serialize POJO objects by its own, but I'm still required to provide a serializer to create the FlinkKafkaProducer (and FlinkKafkaConsumer). Any idea or example? Should I create a DeserializationSchema for each POJO class I want to put into a Kafka stream? On Tue, Nov 15, 2016 at 7:43 AM, Till Rohrmann <[hidden email]> wrote:
|
Hi Matt, Here’s an example of writing a DeserializationSchema for your POJOs: [1]. As for simply writing messages from WebSocket to Kafka using a Flink job, while it is absolutely viable, I would not recommend it, mainly because you’d never know if you might need to temporarily shut down Flink jobs (perhaps for a version upgrade). Shutting down the WebSocket consuming job, would then, of course, lead to missing messages during the shutdown time. It would be perhaps simpler if you have a separate Kafka producer application to directly ingest messages from the WebSocket to Kafka. You wouldn’t want this application to be down at all, so that all messages can safely land into Kafka first. I would recommend to keep this part as simple as possible. From there, like Till explained, your Flink processing pipelines can rely on Kafka’s replayability to provide exactly-once processing guarantees on your data. Best, Gordon On November 16, 2016 at 1:07:12 PM, Dromit ([hidden email]) wrote:
|
I have several times the same thoughts that Dromit: (Kafka cluster is a expensiver over cost) Can someone check this Ideas? - All values have time-stamp - Data Source insert the WaterMark in the Source. Some code example ? 2016-11-16 8:23 GMT+01:00 Tzu-Li (Gordon) Tai <[hidden email]>:
|
In reply to this post by Matt
Hi Dromit
I started using Flink with Kafka but am currently looking into Kinesis to replace Kafka. The reason behind this is that eventually my application will run in somebody's cloud and if I go for AWS then I don't have to take care of operating Kafka and Zookeeper myself. I understand this can be a challenging task. Up to know where the Kafka bit is only running in a local test environment I am happy running it as I just start 2 Docker containers and it does the job. But this also means I have no clue how Kafka really works and what I need to be careful with. Besides knowledge which is required as it seems for Kafka costs is another aspect here. If one wants to operate a Kafka cluster plus Zookeeper on let's say the Amazon cloud this might actually be more expensive than "just" using Kinesis as a service. There are apparently draw backs in terms of functionality and performance but for my use case that does not seem to matter. Philipp |
Tzu-Li Tai, thanks for your response. I've seen the example you mentioned before, TaxiRideSchema.java, but it's way too simplified. In a real POJO class you may have multiple fields such as integers, strings, doubles, etc. So serializing them as a string like in the example wouldn't work (you can't put together two arbitrary strings and later split the byte array to get each of them, same for two integers, and nearly any other types). I feel there should be a more general way of doing this regardless of the fields on the class you're de/serializing. What do you do in these cases? It should be a pretty common scenario! Regards, Matt On Wed, Nov 16, 2016 at 2:01 PM, Philipp Bussche <[hidden email]> wrote: Hi Dromit |
Just to be clear, what I'm looking for is a way to serialize a POJO class for Kafka but also for Flink, I'm not sure the interface of both frameworks are compatible but it seems they aren't. For Kafka (producer) I need a Serializer and a Deserializer class, and for Flink (consumer) a SerializationSchema and DeserializationSchema class. Any example of how to put this together would be greatly appreciated. On Thu, Nov 17, 2016 at 9:12 PM, Dromit <[hidden email]> wrote:
|
A common choice is Apache Avro. You can to define a schema for you Pojos and generate serializers and deserializers. 2016-11-18 5:11 GMT+01:00 Matt <[hidden email]>:
|
In reply to this post by Matt
Hi Matt,
There’s actually a related JIRA for this: https://issues.apache.org/jira/browse/FLINK-4050. The corresponding PR is https://github.com/apache/flink/pull/2705, which adds wrappers for the Kafka serializers. Is this feature what you’re probably looking for? Best Regards, Gordon On November 18, 2016 at 12:11:23 PM, Matt ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |