SinkFunction invoke method signature

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

SinkFunction invoke method signature

Philip Doctor
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm a little surprised by the new method signature, especially on Context (copy+pasted below for ease of discussion).  Shouldn't Context be Context<IN> not Context<T> ? based on the docs?  I'm having a hard time understanding what's getting sent to me here in Context.  Anyone have any insights on why these might be different ?
 
/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

/**
 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
 * an input record.
 *
 * <p>The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
 * afterwards!
 *
 * @param <T> The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {

org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
Reply | Threaded
Open this post in threaded view
|

Re: SinkFunction invoke method signature

Ashwin Sinha
+1

We encountered the exact same issue today. 

On Fri, Jul 13, 2018 at 10:51 PM Philip Doctor <[hidden email]> wrote:
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm a little surprised by the new method signature, especially on Context (copy+pasted below for ease of discussion).  Shouldn't Context be Context<IN> not Context<T> ? based on the docs?  I'm having a hard time understanding what's getting sent to me here in Context.  Anyone have any insights on why these might be different ?
 
/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

/**
 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
 * an input record.
 *
 * <p>The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
 * afterwards!
 *
 * @param <T> The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {

org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java


--
Ashwin Sinha | Data Engineer
[hidden email] | 9452075361


::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.

Reply | Threaded
Open this post in threaded view
|

Re: SinkFunction invoke method signature

Chesnay Schepler
The variables T and IN aren't related to each other.

That is, whether the context interface is defined as Context<T> or Context<IN> makes no difference,
since it is an interface which are always static.

At runtime, the context given to the function should be of type Context<IN>,
but I don't know why the invoke method (and StreamSink for that matter) use raw parameters.

On 13.07.2018 19:35, Ashwin Sinha wrote:
+1

We encountered the exact same issue today. 

On Fri, Jul 13, 2018 at 10:51 PM Philip Doctor <[hidden email]> wrote:
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm a little surprised by the new method signature, especially on Context (copy+pasted below for ease of discussion).  Shouldn't Context be Context<IN> not Context<T> ? based on the docs?  I'm having a hard time understanding what's getting sent to me here in Context.  Anyone have any insights on why these might be different ?
 
/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

/**
 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
 * an input record.
 *
 * <p>The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
 * afterwards!
 *
 * @param <T> The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {

org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java


--
Ashwin Sinha | Data Engineer
[hidden email] | 9452075361

::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.


Reply | Threaded
Open this post in threaded view
|

Re: SinkFunction invoke method signature

Philip Doctor

That is, whether the context interface is defined as Context<T> or Context<IN> makes no difference,
since it is an interface which are always static.


I don't think this is the case.  Context<> is an inner interface, <IN> has a meaning in that scope, <T> does not, so there's a very real difference.  When you go to consume it, you have to consume <*> in order to meet the requirements of the interface, in my example, I want to write:

override fun invoke(value: ByteArray, context: SinkFunction.Context<ByteArray>) {

but I can't, I have to write

override fun invoke(value: ByteArray, context: SinkFunction.Context<*>) {is m
In order to avoid a compile error and actually override the interface.


This means Context<> to me, as a consumer, I have no type information about Context, and need to just unsafely downcast if I wanted to use it.  This feels, at a minimum like a confusing API to consume.  Can you provide some guidance on how I would consume this other than unsafely downcasting the contents of Context<>?



physIQ


From: Chesnay Schepler <[hidden email]>
Sent: Saturday, July 14, 2018 3:54:33 AM
To: Ashwin Sinha; Philip Doctor
Cc: [hidden email]
Subject: Re: SinkFunction invoke method signature
 
The variables T and IN aren't related to each other.

That is, whether the context interface is defined as Context<T> or Context<IN> makes no difference,
since it is an interface which are always static.

At runtime, the context given to the function should be of type Context<IN>,
but I don't know why the invoke method (and StreamSink for that matter) use raw parameters.

On 13.07.2018 19:35, Ashwin Sinha wrote:
+1

We encountered the exact same issue today. 

On Fri, Jul 13, 2018 at 10:51 PM Philip Doctor <[hidden email]> wrote:
Dear Flink Users,
I noticed my sink's `invoke` method was deprecated, so I went to update it, I'm a little surprised by the new method signature, especially on Context (copy+pasted below for ease of discussion).  Shouldn't Context be Context<IN> not Context<T> ? based on the docs?  I'm having a hard time understanding what's getting sent to me here in Context.  Anyone have any insights on why these might be different ?
 
/**
 * Interface for implementing user defined sink functionality.
 *
 * @param <IN> Input type parameter.
 */
@Public
public interface SinkFunction<IN> extends Function, Serializable {

/**
 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about
 * an input record.
 *
 * <p>The context is only valid for the duration of a
 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
 * afterwards!
 *
 * @param <T> The type of elements accepted by the sink.
 */
@Public // Interface might be extended in the future with additional methods.
interface Context<T> {

org.apache.flink/flink-streaming-java_2.11/1.5.0/784a58515da2194a2e74666e8d53d50fac8c03/flink-streaming-java_2.11-1.5.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SinkFunction.java


--
Ashwin Sinha | Data Engineer
[hidden email] | 9452075361

::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.