Avro serialization issue with Kafka09AvroTableSource

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Avro serialization issue with Kafka09AvroTableSource

Morrigan Jones
I recently started working on a PoC with Flink 1.3 that connects to our Kafka cluster and pulls Avro data. Here's the code:

public class KafkaAvroQueries {
    public final static String sql = String.join(
            System.getProperty("line.separator"),
            "SELECT *",
            "FROM Pings"
    );

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        // Setup Kafka (Ping) Table
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "...");
        properties.setProperty("client.id", "flink-poc");
        KafkaAvroTableSource pingTable = new Kafka09AvroTableSource(
                "pings",
                properties,
                Ping.class
        );
        tEnv.registerTableSource("Pings", pingTable);

        // Run the query on the Avro Pings
        Table resultTable = tEnv.sql(KafkaAvroQueries.sql);

        // Output to the console
        ConsoleAppendStreamTableSink tableSink = new ConsoleAppendStreamTableSink();
        TableSchema tableSchema = resultTable.getSchema();
        tableSink.configure(tableSchema.getColumnNames(), tableSchema.getTypes());

        tEnv.writeToSink(resultTable, tableSink, new StreamQueryConfig());
    }
}

ConsoleAppendStreamTableSink is just a simple TableSink I created while looking at the different table sinks types. It just calls print() on the DataStream. Ping is an auto-generated SpecificRecord from an Avro schema. I'm getting a NotSerializableException inside FlinkKafkaConsumerBase, specifically on the SpecificDatumReader it uses internally. Here's the exception:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumerBase is not serializable. The object probably contains or references non serializable fields.
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386)
	at org.apache.flink.streaming.connectors.kafka.KafkaTableSource.getDataStream(KafkaTableSource.java:78)
	at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:106)
	at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678)
	at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637)
	at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214)
	at com.jwplayer.flink.KafkaAvroQueries.main(KafkaAvroQueries.java:42)
Caused by: java.io.NotSerializableException: org.apache.avro.specific.SpecificDatumReader
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
	... 10 more

This looks to be a straight forward use of the Kafka 09 connector, so I'm not sure why I'm running into a serialization issue. Am I missing something obvious? I'm running this with the debugger inside IntelliJ, not on a cluster, though I'm not sure why that would matter. Any help is greatly appreciated!


Morrigan Jones