access to key in sink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

access to key in sink

robert
I am trying to access the keyBy value in the "open" method in a RichSink


Is there a way to access the actual keyBy value in the RichSink ?



DataStream<TestRecord> stream =
env.addSource(new FlinkKafkaConsumer08<>("test", schema, properties)
).setParallelism(1).keyBy("partition");
Reply | Threaded
Open this post in threaded view
|

Re: access to key in sink

Aljoscha Krettek
Hi,
it's not possible to access the key in the open method because without an element that is being processed there is no key. The user function is being used to produce elements of different keys that are being processed on the same shard (instance of a parallel operator). You can get the key manually by inspecting the element in the invoke() method of your sink.

Cheers,
Aljoscha

On Sat, 24 Dec 2016 at 17:15 Telco Phone <[hidden email]> wrote:
I am trying to access the keyBy value in the "open" method in a RichSink


Is there a way to access the actual keyBy value in the RichSink ?



DataStream<TestRecord> stream =
env.addSource(new FlinkKafkaConsumer08<>("test", schema, properties)
).setParallelism(1).keyBy("partition");
Reply | Threaded
Open this post in threaded view
|

Re: access to key in sink

Jark Wu
In reply to this post by robert
Hi Telco,

What do you mean about the “keyBy value” ?  Is it the string parameter value, i.e. “partition” in your case , or the real key value of an actual element being processed ? 

If you mean the string parameter value, it seems that currently it doesn’t support. If you mean the latter one, Aljoscha has pointed out.

- Jark Wu 

在 2016年12月25日,上午12:12,Telco Phone <[hidden email]> 写道:

I am trying to access the keyBy value in the "open" method in a RichSink


Is there a way to access the actual keyBy value in the RichSink ?



DataStream<TestRecord> stream =
env.addSource(new FlinkKafkaConsumer08<>("test", schema, properties)
).setParallelism(1).keyBy("partition");