SQL for Avro GenericRecords on Parquet

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

SQL for Avro GenericRecords on Parquet

Hanan Yehudai

I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.

Seems that the SQL I not executed properly !

 

The persisted records are :

Id  ,  type

3333333,Type1

222222,Type2

3333333,Type1

222222,Type2

3333333,Type1

222222,Type2

3333333,Type1

222222,Type2

3333333,Type1

222222,Type2

3333333,Type1

222222,Type2

 

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( which is correct)

Running  : "SELECT id  ,recordType_  FROM ParquetTable  where recordType_='Type1' "

Will result in :

3333333,Type1

222222,Type1

3333333,Type1

222222,Type1

3333333,Type1

222222,Type1

3333333,Type1

222222,Type1

3333333,Type1

222222,Type1

3333333,Type1

222222,Type1

 

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?

 

 

Reply | Threaded
Open this post in threaded view
|

Re: SQL for Avro GenericRecords on Parquet

Peter Huang
Hi Hanan,

Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate. 



Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <[hidden email]> wrote:
I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?


Reply | Threaded
Open this post in threaded view
|

RE: SQL for Avro GenericRecords on Parquet

Hanan Yehudai

HI Peter.  Thanks.

This is my code .  I used one of the parquet / avro tests as a reference.

 

The code will fail on

Test testScan(ParquetTestCase) failed with:

java.lang.UnsupportedOperationException

               at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)

               at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)

               at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)

 

 

CODE :

 

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.generic.GenericRecordBuilder;

import org.apache.avro.specific.SpecificRecord;

import org.apache.avro.specific.SpecificRecordBuilderBase;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.io.ParallelIteratorInputFormat;

import org.apache.flink.api.java.io.TupleCsvInputFormat;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

 

import org.apache.flink.formats.parquet.ParquetTableSource;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.java.BatchTableEnvironment;

 

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.sinks.CsvTableSink;

import org.apache.flink.table.sinks.TableSink;

import org.apache.flink.test.util.MultipleProgramsTestBase;

import org.apache.flink.types.Row;

 

import org.apache.avro.generic.IndexedRecord;

import org.apache.parquet.avro.AvroSchemaConverter;

import org.apache.parquet.schema.MessageType;

import org.junit.BeforeClass;

import org.junit.ClassRule;

import org.junit.Test;

import org.junit.rules.TemporaryFolder;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.UUID;

 

import static org.junit.Assert.assertEquals;

 

import org.apache.parquet.avro.AvroParquetWriter;

import org.apache.parquet.hadoop.ParquetWriter;

 

 

public class  ParquetTestCase extends MultipleProgramsTestBase {

 

    private static String avroSchema = "{\n" +

            "  \"name\": \"SimpleRecord\",\n" +

            "  \"type\": \"record\",\n" +

            "  \"fields\": [\n" +

            "    { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"recordType_\", \"type\": [ \"null\", \"string\"]}\n" +

            "  ],\n" +

            "  \"schema_id\": 1,\n" +

            "  \"type\": \"record\"\n" +

            "}";

 

    private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();

    private static Schema schm = new Schema.Parser().parse(avroSchema);

    private static Path testPath;

 

 

    public ParquetTestCase() {

        super(TestExecutionMode.COLLECTION);

    }

 

 

    @BeforeClass

    public static void setup() throws Exception {

 

        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schm);

 

 

        List<IndexedRecord> recs = new ArrayList<>();

        for (int i = 0; i < 6; i++) {

            GenericRecord gr = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", "Type1").build();

            recs.add(gr);

            GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", "Type2").build();

            recs.add(gr2);

        }

 

        testPath = new Path("/tmp",  UUID.randomUUID().toString());

 

 

        ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(

                new org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();

 

        for (IndexedRecord record : recs) {

            writer.write(record);

        }

        writer.close();

    }

 

 

    private ParquetTableSource createParquetTableSource(Path path) throws IOException {

        MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);

        ParquetTableSource parquetTableSource = ParquetTableSource.builder()

                .path(path.getPath())

                .forParquetSchema(nestedSchema)

                .build();

        return parquetTableSource;

    }

 

    @Test

    public void testScan() throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 

        BatchTableEnvironment batchTableEnvironment  =    BatchTableEnvironment.create(env);

        ParquetTableSource tableSource = createParquetTableSource(testPath);

        batchTableEnvironment.registerTableSource("ParquetTable", tableSource);

 

         Table tab = batchTableEnvironment.sqlQuery("select id,recordType_  from ParquetTable where id > 222222 ");

 

        DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class);

 

        result.print();

 

    }

 

 

}

 

 

From: Peter Huang <[hidden email]>
Sent: Monday, November 18, 2019 7:22 PM
To: dev <[hidden email]>
Cc: [hidden email]
Subject: Re: SQL for Avro GenericRecords on Parquet

 

Hi Hanan,

 

Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate. 

 

 

 

Best Regards

Peter Huang

 

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <[hidden email]> wrote:

I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?

Reply | Threaded
Open this post in threaded view
|

Re: SQL for Avro GenericRecords on Parquet

Peter Huang
Hi Hanan,

After investigating the issue by using the test case you provided, I think there is a big in it. Currently, the parquet predicts push down use the predicate literal type to construct the FilterPredicate.
The issue happens when the data type of value in predicate inferred from SQL doesn't match the parquet schema. For example, foo is a long type, foo < 1 is the predicate. Literal will be recognized as an integration. It causes the parquet FilterPredicate is mistakenly created for the column of Integer type. I created a ticket for the issue. https://issues.apache.org/jira/browse/FLINK-14953. Please also add more insight by comment directly on it.


Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai <[hidden email]> wrote:

HI Peter.  Thanks.

This is my code .  I used one of the parquet / avro tests as a reference.

 

The code will fail on

Test testScan(ParquetTestCase) failed with:

java.lang.UnsupportedOperationException

               at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)

               at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)

               at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)

 

 

CODE :

 

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.generic.GenericRecordBuilder;

import org.apache.avro.specific.SpecificRecord;

import org.apache.avro.specific.SpecificRecordBuilderBase;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.io.ParallelIteratorInputFormat;

import org.apache.flink.api.java.io.TupleCsvInputFormat;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

 

import org.apache.flink.formats.parquet.ParquetTableSource;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.java.BatchTableEnvironment;

 

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.sinks.CsvTableSink;

import org.apache.flink.table.sinks.TableSink;

import org.apache.flink.test.util.MultipleProgramsTestBase;

import org.apache.flink.types.Row;

 

import org.apache.avro.generic.IndexedRecord;

import org.apache.parquet.avro.AvroSchemaConverter;

import org.apache.parquet.schema.MessageType;

import org.junit.BeforeClass;

import org.junit.ClassRule;

import org.junit.Test;

import org.junit.rules.TemporaryFolder;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.UUID;

 

import static org.junit.Assert.assertEquals;

 

import org.apache.parquet.avro.AvroParquetWriter;

import org.apache.parquet.hadoop.ParquetWriter;

 

 

public class  ParquetTestCase extends MultipleProgramsTestBase {

 

    private static String avroSchema = "{\n" +

            "  \"name\": \"SimpleRecord\",\n" +

            "  \"type\": \"record\",\n" +

            "  \"fields\": [\n" +

            "    { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"recordType_\", \"type\": [ \"null\", \"string\"]}\n" +

            "  ],\n" +

            "  \"schema_id\": 1,\n" +

            "  \"type\": \"record\"\n" +

            "}";

 

    private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();

    private static Schema schm = new Schema.Parser().parse(avroSchema);

    private static Path testPath;

 

 

    public ParquetTestCase() {

        super(TestExecutionMode.COLLECTION);

    }

 

 

    @BeforeClass

    public static void setup() throws Exception {

 

        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schm);

 

 

        List<IndexedRecord> recs = new ArrayList<>();

        for (int i = 0; i < 6; i++) {

            GenericRecord gr = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", "Type1").build();

            recs.add(gr);

            GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", "Type2").build();

            recs.add(gr2);

        }

 

        testPath = new Path("/tmp",  UUID.randomUUID().toString());

 

 

        ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(

                new org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();

 

        for (IndexedRecord record : recs) {

            writer.write(record);

        }

        writer.close();

    }

 

 

    private ParquetTableSource createParquetTableSource(Path path) throws IOException {

        MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);

        ParquetTableSource parquetTableSource = ParquetTableSource.builder()

                .path(path.getPath())

                .forParquetSchema(nestedSchema)

                .build();

        return parquetTableSource;

    }

 

    @Test

    public void testScan() throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 

        BatchTableEnvironment batchTableEnvironment  =    BatchTableEnvironment.create(env);

        ParquetTableSource tableSource = createParquetTableSource(testPath);

        batchTableEnvironment.registerTableSource("ParquetTable", tableSource);

 

         Table tab = batchTableEnvironment.sqlQuery("select id,recordType_  from ParquetTable where id > 222222 ");

 

        DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class);

 

        result.print();

 

    }

 

 

}

 

 

From: Peter Huang <[hidden email]>
Sent: Monday, November 18, 2019 7:22 PM
To: dev <[hidden email]>
Cc: [hidden email]
Subject: Re: SQL for Avro GenericRecords on Parquet

 

Hi Hanan,

 

Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate. 

 

 

 

Best Regards

Peter Huang

 

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <[hidden email]> wrote:

I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?

Reply | Threaded
Open this post in threaded view
|

Re: SQL for Avro GenericRecords on Parquet

Peter Huang
Hi Hanan,

I created a fix for the problem. Would you please try it from your side? 


Best Regards
Peter Huang

On Tue, Nov 26, 2019 at 8:07 AM Peter Huang <[hidden email]> wrote:
Hi Hanan,

After investigating the issue by using the test case you provided, I think there is a big in it. Currently, the parquet predicts push down use the predicate literal type to construct the FilterPredicate.
The issue happens when the data type of value in predicate inferred from SQL doesn't match the parquet schema. For example, foo is a long type, foo < 1 is the predicate. Literal will be recognized as an integration. It causes the parquet FilterPredicate is mistakenly created for the column of Integer type. I created a ticket for the issue. https://issues.apache.org/jira/browse/FLINK-14953. Please also add more insight by comment directly on it.


Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai <[hidden email]> wrote:

HI Peter.  Thanks.

This is my code .  I used one of the parquet / avro tests as a reference.

 

The code will fail on

Test testScan(ParquetTestCase) failed with:

java.lang.UnsupportedOperationException

               at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)

               at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)

               at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)

 

 

CODE :

 

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.generic.GenericRecordBuilder;

import org.apache.avro.specific.SpecificRecord;

import org.apache.avro.specific.SpecificRecordBuilderBase;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.io.ParallelIteratorInputFormat;

import org.apache.flink.api.java.io.TupleCsvInputFormat;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

 

import org.apache.flink.formats.parquet.ParquetTableSource;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.java.BatchTableEnvironment;

 

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.table.sinks.CsvTableSink;

import org.apache.flink.table.sinks.TableSink;

import org.apache.flink.test.util.MultipleProgramsTestBase;

import org.apache.flink.types.Row;

 

import org.apache.avro.generic.IndexedRecord;

import org.apache.parquet.avro.AvroSchemaConverter;

import org.apache.parquet.schema.MessageType;

import org.junit.BeforeClass;

import org.junit.ClassRule;

import org.junit.Test;

import org.junit.rules.TemporaryFolder;

 

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.UUID;

 

import static org.junit.Assert.assertEquals;

 

import org.apache.parquet.avro.AvroParquetWriter;

import org.apache.parquet.hadoop.ParquetWriter;

 

 

public class  ParquetTestCase extends MultipleProgramsTestBase {

 

    private static String avroSchema = "{\n" +

            "  \"name\": \"SimpleRecord\",\n" +

            "  \"type\": \"record\",\n" +

            "  \"fields\": [\n" +

            "    { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", \"long\" ]},\n" +

            "    { \"default\": null, \"name\": \"recordType_\", \"type\": [ \"null\", \"string\"]}\n" +

            "  ],\n" +

            "  \"schema_id\": 1,\n" +

            "  \"type\": \"record\"\n" +

            "}";

 

    private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();

    private static Schema schm = new Schema.Parser().parse(avroSchema);

    private static Path testPath;

 

 

    public ParquetTestCase() {

        super(TestExecutionMode.COLLECTION);

    }

 

 

    @BeforeClass

    public static void setup() throws Exception {

 

        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schm);

 

 

        List<IndexedRecord> recs = new ArrayList<>();

        for (int i = 0; i < 6; i++) {

            GenericRecord gr = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", "Type1").build();

            recs.add(gr);

            GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", "Type2").build();

            recs.add(gr2);

        }

 

        testPath = new Path("/tmp",  UUID.randomUUID().toString());

 

 

        ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(

                new org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();

 

        for (IndexedRecord record : recs) {

            writer.write(record);

        }

        writer.close();

    }

 

 

    private ParquetTableSource createParquetTableSource(Path path) throws IOException {

        MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);

        ParquetTableSource parquetTableSource = ParquetTableSource.builder()

                .path(path.getPath())

                .forParquetSchema(nestedSchema)

                .build();

        return parquetTableSource;

    }

 

    @Test

    public void testScan() throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 

        BatchTableEnvironment batchTableEnvironment  =    BatchTableEnvironment.create(env);

        ParquetTableSource tableSource = createParquetTableSource(testPath);

        batchTableEnvironment.registerTableSource("ParquetTable", tableSource);

 

         Table tab = batchTableEnvironment.sqlQuery("select id,recordType_  from ParquetTable where id > 222222 ");

 

        DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class);

 

        result.print();

 

    }

 

 

}

 

 

From: Peter Huang <[hidden email]>
Sent: Monday, November 18, 2019 7:22 PM
To: dev <[hidden email]>
Cc: [hidden email]
Subject: Re: SQL for Avro GenericRecords on Parquet

 

Hi Hanan,

 

Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate. 

 

 

 

Best Regards

Peter Huang

 

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <[hidden email]> wrote:

I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !

The persisted records are :
Id  ,  type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2

While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the above ( which is correct)
Running  : "SELECT id  ,recordType_  FROM ParquetTable  where recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1

As if the equal sign is assignment and not equal …

am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?