|
I recently started working on a PoC with Flink 1.3 that connects to our Kafka cluster and pulls Avro data. Here's the code:
public class KafkaAvroQueries {
public final static String sql = String.join(
System.getProperty("line.separator"),
"SELECT *",
"FROM Pings"
);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
// Setup Kafka (Ping) Table
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "...");
properties.setProperty("client.id", "flink-poc");
KafkaAvroTableSource pingTable = new Kafka09AvroTableSource(
"pings",
properties,
Ping.class
);
tEnv.registerTableSource("Pings", pingTable);
// Run the query on the Avro Pings
Table resultTable = tEnv.sql(KafkaAvroQueries.sql);
// Output to the console
ConsoleAppendStreamTableSink tableSink = new ConsoleAppendStreamTableSink();
TableSchema tableSchema = resultTable.getSchema();
tableSink.configure(tableSchema.getColumnNames(), tableSchema.getTypes());
tEnv.writeToSink(resultTable, tableSink, new StreamQueryConfig());
}
}
ConsoleAppendStreamTableSink is just a simple TableSink I created while looking at the different table sinks types. It just calls print() on the DataStream. Ping is an auto-generated SpecificRecord from an Avro schema. I'm getting a NotSerializableException inside FlinkKafkaConsumerBase, specifically on the SpecificDatumReader it uses internally. Here's the exception: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386) at org.apache.flink.streaming.connectors.kafka.KafkaTableSource.getDataStream(KafkaTableSource.java:78) at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:106) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214) at com.jwplayer.flink.KafkaAvroQueries.main(KafkaAvroQueries.java:42) Caused by: java.io.NotSerializableException: org.apache.avro.specific.SpecificDatumReader at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 10 more This looks to be a straight forward use of the Kafka 09 connector, so I'm not sure why I'm running into a serialization issue. Am I missing something obvious? I'm running this with the debugger inside IntelliJ, not on a cluster, though I'm not sure why that would matter. Any help is greatly appreciated! Morrigan Jones |
| Free forum by Nabble | Edit this page |
