HI
Peter. Thanks.
This is my code . I used one of the parquet / avro tests as a reference.
The code will fail on
Test testScan(ParquetTestCase) failed with:
java.lang.UnsupportedOperationException
at org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)
at org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)
at org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)
CODE :
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBuilderBase;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.ParquetTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;
import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
public class ParquetTestCase extends MultipleProgramsTestBase {
private static String avroSchema = "{\n" +
" \"name\": \"SimpleRecord\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" { \"default\": null, \"name\": \"timestamp_edr\", \"type\": [ \"null\", \"long\" ]},\n" +
" { \"default\": null, \"name\": \"id\", \"type\": [ \"null\", \"long\" ]},\n" +
" { \"default\": null, \"name\": \"recordType_\", \"type\": [ \"null\", \"string\"]}\n" +
" ],\n" +
" \"schema_id\": 1,\n" +
" \"type\": \"record\"\n" +
"}";
private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter();
private static Schema schm = new Schema.Parser().parse(avroSchema);
private static Path testPath;
public ParquetTestCase() {
super(TestExecutionMode.COLLECTION);
}
@BeforeClass
public static void setup() throws Exception {
GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schm);
List<IndexedRecord> recs = new ArrayList<>();
for (int i = 0; i < 6; i++) {
GenericRecord gr = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 3333333L).set("recordType_", "Type1").build();
recs.add(gr);
GenericRecord gr2 = genericRecordBuilder.set("timestamp_edr", System.currentTimeMillis() / 1000).set("id", 222222L).set("recordType_", "Type2").build();
recs.add(gr2);
}
testPath = new Path("/tmp", UUID.randomUUID().toString());
ParquetWriter<IndexedRecord> writer = AvroParquetWriter.<IndexedRecord>builder(
new org.apache.hadoop.fs.Path(testPath.toUri())).withSchema(schm).build();
for (IndexedRecord record : recs) {
writer.write(record);
}
writer.close();
}
private ParquetTableSource createParquetTableSource(Path path) throws IOException {
MessageType nestedSchema = SCHEMA_CONVERTER.convert(schm);
ParquetTableSource parquetTableSource = ParquetTableSource.builder()
.path(path.getPath())
.forParquetSchema(nestedSchema)
.build();
return parquetTableSource;
}
@Test
public void testScan() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(env);
ParquetTableSource tableSource = createParquetTableSource(testPath);
batchTableEnvironment.registerTableSource("ParquetTable", tableSource);
Table tab = batchTableEnvironment.sqlQuery("select id,recordType_ from ParquetTable where id > 222222 ");
DataSet<Row> result = batchTableEnvironment.toDataSet(tab, Row.class);
result.print();
}
}
From: Peter Huang <[hidden email]>
Sent: Monday, November 18, 2019 7:22 PM
To: dev <[hidden email]>
Cc: [hidden email]
Subject: Re: SQL for Avro GenericRecords on Parquet
Hi Hanan,
Thanks for reporting the issue. Would you please attach your test code here? I may help to investigate.
Best Regards
Peter Huang
On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai <[hidden email]> wrote:
I have tried to persist Generic Avro records in a parquet file and then read it via ParquetTablesource – using SQL.
Seems that the SQL I not executed properly !
The persisted records are :
Id , type
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
3333333,Type1
222222,Type2
While SQL of SELECT id ,recordType_ FROM ParquetTable - return the above ( which is correct)
Running : "SELECT id ,recordType_ FROM ParquetTable where recordType_='Type1' "
Will result in :
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
3333333,Type1
222222,Type1
As if the equal sign is assignment and not equal …
am I doing something wrong ? is it an issue of Generic record vs SpecificRecords ?
Free forum by Nabble | Edit this page |