Getting key from keyed stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting key from keyed stream

Paul Joireman

Hi all,


Is there a simple way to read the key from a KeyedStream.   Very simply I'm trying to read a message from Kafka, separate the incoming messages by a field in the message and write the original message back to Kafka using that field as a new topic.  I chose to partition the incoming stream by creating a KeyedStream and using the field from the message as the key.    The only thing left is to write the message to Kafka with a producer but i need to know the topic to write to and for that I need to be able to read the key.   Is there a way to do this?


Is there a better way to do this, rather than using a KeyedStream.


Paul

Reply | Threaded
Open this post in threaded view
|

Re: Getting key from keyed stream

Jamie Grier

A simpler and more efficient approach would simply be the following:

val stream = env.addSource(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()

In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman <[hidden email]> wrote:

Hi all,


Is there a simple way to read the key from a KeyedStream.   Very simply I'm trying to read a message from Kafka, separate the incoming messages by a field in the message and write the original message back to Kafka using that field as a new topic.  I chose to partition the incoming stream by creating a KeyedStream and using the field from the message as the key.    The only thing left is to write the message to Kafka with a producer but i need to know the topic to write to and for that I need to be able to read the key.   Is there a way to do this?


Is there a better way to do this, rather than using a KeyedStream.


Paul

--

Jamie Grier
data Artisans, Director of Applications Engineering

Reply | Threaded
Open this post in threaded view
|

Re: Getting key from keyed stream

Paul Joireman

Thanks Jamie,


Just figured that out after some digging and a little trial and error, that works great.  


Paul


From: Jamie Grier <[hidden email]>
Sent: Thursday, January 12, 2017 4:59:43 PM
To: [hidden email]
Subject: Re: Getting key from keyed stream
 

A simpler and more efficient approach would simply be the following:

val stream = env.addSource(new FlinkKafkaConsumer(...))

stream
  .addSink(new FlinkKafkaProducer(new MyKeyedSerializationSchema(...)))

env.execute()

In MyKeyedSerializationSchema just override the getTargetTopic() method.

That should do it :)

-Jamie

On Thu, Jan 12, 2017 at 12:53 PM, Paul Joireman <[hidden email]> wrote:

Hi all,


Is there a simple way to read the key from a KeyedStream.   Very simply I'm trying to read a message from Kafka, separate the incoming messages by a field in the message and write the original message back to Kafka using that field as a new topic.  I chose to partition the incoming stream by creating a KeyedStream and using the field from the message as the key.    The only thing left is to write the message to Kafka with a producer but i need to know the topic to write to and for that I need to be able to read the key.   Is there a way to do this?


Is there a better way to do this, rather than using a KeyedStream.


Paul

--

Jamie Grier
data Artisans, Director of Applications Engineering