RE: SQL for Avro GenericRecords on Parquet

Posted by Hanan Yehudai on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/SQL-for-Avro-GenericRecords-on-Parquet-tp31048p31055.html

HI Peter.  Thanks.

This is my code .  I used one of the parquet / avro tests as a reference.

 

The code will fail on

Test testScan(ParquetTestCase) failed with:

java.lang.UnsupportedOperationException

               at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)

               at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)

               at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)

 

 

CODE :

 

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.generic.GenericRecordBuilder;

import org.apache.avro.specific.SpecificRecord;

import org.apache.avro.specific.SpecificRecordBuilderBase;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.io.ParallelIteratorInputFormat;

import org.apache.flink.api.java.io.TupleCsvInputFormat;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

 

import org.apache.flink.formats.parquet.ParquetTableSource;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.java.BatchTableEnvironment;

 

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.sinks.CsvTableSink;

import org.apache.flink.table.sinks.TableSink;

import org.apache.flink.test.util.MultipleProgramsTestBase;

import org.apache.flink.types.Row;

 

import org.apache.avro.generic.IndexedRecord;

import org.apache.parquet.avro.AvroSchemaConverter;

import org.apache.parquet.schema.MessageType;

import org.junit.BeforeClass;

import org.junit.ClassRule;

import org.junit.Test;

import org.junit.rules.TemporaryFolder;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.UUID;

 

import static org.junit.Assert.assertEquals;

 

import org.apache.parquet.avro.AvroParquetWriter;

import org.apache.parquet.hadoop.ParquetWriter;

 

 

public class  ParquetTestCase extends MultipleProgramsTestBase {

 

    private static String avroSchema = "{\n" +

            "  \"name\": \"SimpleRecord\",\n" +

            "  \"type\": \"record\",\n" +

            "  \"fields\": [\n" +

            "    { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"recordType_\", \"type\": [ \"null\", \"string\"]}\n" +

            "  ],\n" +

            "  \"schema_id\": 1,\n" +

            "  \"type\": \"record\"\n" +

            "}";

 

    private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();

    private static Schema schm = new Schema.Parser().parse(avroSchema);

    private static Path testPath;

 

 

    public ParquetTestCase() {

        super(TestExecutionMode.COLLECTION);

    }

 

 

    @BeforeClass

    public static void setup() throws Exception {

 

        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schm);

 

 

        List<IndexedRecord> recs = new ArrayList<>();

        for (int i = 0; i < 6; i++) {

            GenericRecord gr = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", "Type1").build();

            recs.add(gr);

            GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", "Type2").build();

            recs.add(gr2);

        }

 

        testPath = new Path("/tmp",  UUID.randomUUID().toString());

 

 

        ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(

                new org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();

 

        for (IndexedRecord record : recs) {

            writer.write(record);

        }

        writer.close();

    }

 

 

    private ParquetTableSource createParquetTableSource(Path path) throws IOException {

        MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);

        ParquetTableSource parquetTableSource = ParquetTableSource.builder()

                .path(path.getPath())

                .forParquetSchema(nestedSchema)

                .build();

        return parquetTableSource;

    }

 

    @Test

    public void testScan() throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 

        BatchTableEnvironment batchTableEnvironment  =    BatchTableEnvironment.create(env);

        ParquetTableSource tableSource = createParquetTableSource(testPath);

        batchTableEnvironment.registerTableSource("ParquetTable", tableSource);

 

         Table tab = batchTableEnvironment.sqlQuery("select id,recordType_  from ParquetTable where id > 222222 ");

 

        DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class);

 

        result.print();

 

    }

 

 

}

 

 

From: Peter Huang <[hidden email]>
Sent: Monday, November 18, 2019 7:22 PM
To: dev <[hidden email]>
Cc: [hidden email]
Subject: Re: SQL for Avro GenericRecords on Parquet

 

Hi Hanan,

 

Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate. 

 

 

 

Best Regards

Peter Huang

 

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <[hidden email]> wrote:

I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?