Trying to implement UpsertStreamTableSink in Java

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Trying to implement UpsertStreamTableSink in Java

Porritt, James

I put this class together when trying to create my own upsertable table sink in Java:

 

public class MyTableSink implements UpsertStreamTableSink<Row> {

    @Override

    public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {

        return null;

    }

 

    @Override

    public void setKeyFields(String[] keys) {

        System.out.println("setKeyFields" + keys);

    }

 

    @Override

    public void setIsAppendOnly(Boolean isAppendOnly) {

 

    }

 

    @Override

    public String[] getFieldNames() {

        return new String[0];

    }

 

    @Override

    public TypeInformation<?>[] getFieldTypes() {

        return new TypeInformation[0];

    }

 

    @Override

    public TypeInformation<Row> getRecordType() {

        return null;

    }

 

    @Override

    public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {

        return new TupleTypeInfo<Tuple2<Boolean, Row>>();

    }

 

    @Override

    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

        dataStream.print();

    }

}

 

I try and link it to my StreamTable with:

 

mystreamtable.writeToSink(new MyTableSink());

 

For some reason though I’m getting the error:

 

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)

        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)

        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)

        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)

        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)

        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)

        at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)

Caused by: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

        at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:284)

        at org.apache.flink.table.api.Table.writeToSink(table.scala:862)

        at org.apache.flink.table.api.Table.writeToSink(table.scala:830)

        at alphagen_stats.KafkaAlphaGen.main(KafkaAlphaGen.java:265)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

        ... 12 more

 

What am I doing wrong?

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################

Reply | Threaded
Open this post in threaded view
|

Re: Trying to implement UpsertStreamTableSink in Java

Timo Walther
Hi James,

the method `Table.writeToSink()` calls `configure(String[] fieldNames, TypeInformation<?>[] fieldTypes)` internally. Since you return null, you are trying to register null instead of a table sink.

I hope this helps.

Regards,
Timo


Am 23.07.18 um 14:33 schrieb Porritt, James:

I put this class together when trying to create my own upsertable table sink in Java:

 

public class MyTableSink implements UpsertStreamTableSink<Row> {

    @Override

    public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {

        return null;

    }

 

    @Override

    public void setKeyFields(String[] keys) {

        System.out.println("setKeyFields" + keys);

    }

 

    @Override

    public void setIsAppendOnly(Boolean isAppendOnly) {

 

    }

 

    @Override

    public String[] getFieldNames() {

        return new String[0];

    }

 

    @Override

    public TypeInformation<?>[] getFieldTypes() {

        return new TypeInformation[0];

    }

 

    @Override

    public TypeInformation<Row> getRecordType() {

        return null;

    }

 

    @Override

    public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {

        return new TupleTypeInfo<Tuple2<Boolean, Row>>();

    }

 

    @Override

    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {

        dataStream.print();

    }

}

 

I try and link it to my StreamTable with:

 

mystreamtable.writeToSink(new MyTableSink());

 

For some reason though I’m getting the error:

 

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.

        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)

        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)

        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)

        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)

        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)

        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)

        at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)

        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)

Caused by: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

        at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:284)

        at org.apache.flink.table.api.Table.writeToSink(table.scala:862)

        at org.apache.flink.table.api.Table.writeToSink(table.scala:830)

        at alphagen_stats.KafkaAlphaGen.main(KafkaAlphaGen.java:265)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

        ... 12 more

 

What am I doing wrong?

######################################################################
The information contained in this communication is confidential and
intended only for the individual(s) named above. If you are not a named
addressee, please notify the sender immediately and delete this email
from your system and do not disclose the email or any part of it to any
person. The views expressed in this email are the views of the author
and do not necessarily represent the views of Millennium Capital Partners
LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
communications of MCP LLP and its affiliates, including telephone
communications, may be electronically archived and subject to review
and/or disclosure to someone other than the recipient. MCP LLP is
authorized and regulated by the Financial Conduct Authority. Millennium
Capital Partners LLP is a limited liability partnership registered in
England & Wales with number OC312897 and with its registered office at
50 Berkeley Street, London, W1J 8HD.
######################################################################