I recently started working on a PoC with Flink 1.3 that connects to our Kafka cluster and pulls Avro data. Here's the code:
public class KafkaAvroQueries { public final static String sql = String.join( System.getProperty("line.separator"), "SELECT *", "FROM Pings" ); public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); // Setup Kafka (Ping) Table Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "..."); properties.setProperty("client.id", "flink-poc"); KafkaAvroTableSource pingTable = new Kafka09AvroTableSource( "pings", properties, Ping.class ); tEnv.registerTableSource("Pings", pingTable); // Run the query on the Avro Pings Table resultTable = tEnv.sql(KafkaAvroQueries.sql); // Output to the console ConsoleAppendStreamTableSink tableSink = new ConsoleAppendStreamTableSink(); TableSchema tableSchema = resultTable.getSchema(); tableSink.configure(tableSchema.getColumnNames(), tableSchema.getTypes()); tEnv.writeToSink(resultTable, tableSink, new StreamQueryConfig()); } } ConsoleAppendStreamTableSink is just a simple TableSink I created while looking at the different table sinks types. It just calls print() on the DataStream. Ping is an auto-generated SpecificRecord from an Avro schema. I'm getting a NotSerializableException inside FlinkKafkaConsumerBase, specifically on the SpecificDatumReader it uses internally. Here's the exception: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386) at org.apache.flink.streaming.connectors.kafka.KafkaTableSource.getDataStream(KafkaTableSource.java:78) at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:106) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637) at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214) at com.jwplayer.flink.KafkaAvroQueries.main(KafkaAvroQueries.java:42) Caused by: java.io.NotSerializableException: org.apache.avro.specific.SpecificDatumReader at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 10 more This looks to be a straight forward use of the Kafka 09 connector, so I'm not sure why I'm running into a serialization issue. Am I missing something obvious? I'm running this with the debugger inside IntelliJ, not on a cluster, though I'm not sure why that would matter. Any help is greatly appreciated! Morrigan Jones |
Free forum by Nabble | Edit this page |