Writing Tuple2 to a sink

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Writing Tuple2 to a sink

Mohit Anchlia
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that the type is not known inside the sink function?
Reply | Threaded
Open this post in threaded view
|

Re: Writing Tuple2 to a sink

Biao Liu
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this situation. There is a KeyedSerializationSchema user have to implement.   KeyedSerializationSchema will be used to serialize data, so that SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in SinkFunction. And user have to implement the SerializationSchema, maybe named Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <[hidden email]>:
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that the type is not known inside the sink function?

Reply | Threaded
Open this post in threaded view
|

Re: Writing Tuple2 to a sink

Mohit Anchlia
This works for Kafka but for the other types of sink am I supposed to use some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <[hidden email]> wrote:
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this situation. There is a KeyedSerializationSchema user have to implement.   KeyedSerializationSchema will be used to serialize data, so that SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in SinkFunction. And user have to implement the SerializationSchema, maybe named Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <[hidden email]>:
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that the type is not known inside the sink function?


Reply | Threaded
Open this post in threaded view
|

Re: Writing Tuple2 to a sink

Biao Liu
Currently, OutputFormat is used for DataSet, SinkFunction is used for DataStream. Maybe I misunderstand your problem. That will be better if you give more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia <[hidden email]>:
This works for Kafka but for the other types of sink am I supposed to use some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <[hidden email]> wrote:
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this situation. There is a KeyedSerializationSchema user have to implement.   KeyedSerializationSchema will be used to serialize data, so that SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in SinkFunction. And user have to implement the SerializationSchema, maybe named Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <[hidden email]>:
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that the type is not known inside the sink function?



Reply | Threaded
Open this post in threaded view
|

Re: Writing Tuple2 to a sink

Tzu-Li (Gordon) Tai
Hi Mohit,

I don’t completely understand your question, but I’m assuming that you know the type of records your custom sink will be receiving, but you don’t know how to extract values from the records.

Assume that the type of the incoming records will be `Tuple2<String, Integer>`. When writing your custom sink, you should define that type by:

```
public class YourCustomSink implements SinkFunction<Tuple2<String, Integer>> {
    …
    
    public void invoke(Tuple2<String, Integer> next) {
        // use next.f0 / next.f1 to retrieve values from the tuple
    }

    ...
}
```

You can of course also define generic types to replace `String` and `Integer`, like so:

```
public class YourCustomSink<F, S> implements SinkFunction<Tuple2<F, S>> {
    …
    
    public void invoke(Tuple2<F, S> next) {
        F field1 = next.f0;
        S field2 = next.f1;
        ...
    }

    ...
}
```

Just replace the generic types with concrete types when instantiating your custom sink, according to your topology.

Let me know if this answers your question!

Cheers,
Gordon

On February 24, 2017 at 10:42:33 AM, 刘彪 ([hidden email]) wrote:

Currently, OutputFormat is used for DataSet, SinkFunction is used for DataStream. Maybe I misunderstand your problem. That will be better if you give more details.

2017-02-24 5:21 GMT+08:00 Mohit Anchlia <[hidden email]>:
This works for Kafka but for the other types of sink am I supposed to use some type of outputformat?

On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 <[hidden email]> wrote:
Hi
I think there is a good way in FlinkKafkaProducerBase.java to deal with this situation. There is a KeyedSerializationSchema user have to implement.   KeyedSerializationSchema will be used to serialize data, so that SinkFunction just need to understand the type after serialization.
In your case, I think you can add a SerializationSchema interface in SinkFunction. And user have to implement the SerializationSchema, maybe named Tuple2SerializationSchema. 

2017-02-22 7:17 GMT+08:00 Mohit Anchlia <[hidden email]>:
What's the best way to retrieve both the values in Tuple2 inside a custom sink given that the type is not known inside the sink function?