Help with Flink experimental Table API

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with Flink experimental Table API

Shiti Saxena
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti



Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti



Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti




Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Till Rohrmann
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti




Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti




Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti




Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti





Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti






Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti






Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti






Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti







Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti








Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
Hi,
the reason why this doesn't work is that the TupleSerializer cannot deal with null values:
@Test
def testAggregationWithNull(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val table = env.fromElements[(Integer, String)](
(123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

val total = table.select('_1.sum).collect().head.productElement(0)
assertEquals(total, 702)
}
it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me.
Cheers,
Aljoscha

On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <[hidden email]> wrote:
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti








Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the reason why this doesn't work is that the TupleSerializer cannot deal with null values:
@Test
def testAggregationWithNull(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val table = env.fromElements[(Integer, String)](
(123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

val total = table.select('_1.sum).collect().head.productElement(0)
assertEquals(total, 702)
}
it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me.
Cheers,
Aljoscha

On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <[hidden email]> wrote:
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti









Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
I think you can work on it. By the way, there are actually two serializers. For Scala, CaseClassSerializer is responsible for tuples as well. In Java, TupleSerializer is responsible for, well, Tuples. 

On Tue, 16 Jun 2015 at 06:25 Shiti Saxena <[hidden email]> wrote:
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the reason why this doesn't work is that the TupleSerializer cannot deal with null values:
@Test
def testAggregationWithNull(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val table = env.fromElements[(Integer, String)](
(123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

val total = table.select('_1.sum).collect().head.productElement(0)
assertEquals(total, 702)
}
it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me.
Cheers,
Aljoscha

On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <[hidden email]> wrote:
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti









Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
One more thing, it would be good if the TupleSerializer didn't write a boolean for every field. A single integer could be used where one bit specifies if a given field is null or not. (Maybe we should also add this to the RowSerializer in the future.)

On Tue, 16 Jun 2015 at 07:30 Aljoscha Krettek <[hidden email]> wrote:
I think you can work on it. By the way, there are actually two serializers. For Scala, CaseClassSerializer is responsible for tuples as well. In Java, TupleSerializer is responsible for, well, Tuples. 

On Tue, 16 Jun 2015 at 06:25 Shiti Saxena <[hidden email]> wrote:
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the reason why this doesn't work is that the TupleSerializer cannot deal with null values:
@Test
def testAggregationWithNull(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val table = env.fromElements[(Integer, String)](
(123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

val total = table.select('_1.sum).collect().head.productElement(0)
assertEquals(total, 702)
}
it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me.
Cheers,
Aljoscha

On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <[hidden email]> wrote:
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti









Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Shiti Saxena
Can we use 0(false) and 1(true)?

On Tue, Jun 16, 2015 at 1:32 PM, Aljoscha Krettek <[hidden email]> wrote:
One more thing, it would be good if the TupleSerializer didn't write a boolean for every field. A single integer could be used where one bit specifies if a given field is null or not. (Maybe we should also add this to the RowSerializer in the future.)

On Tue, 16 Jun 2015 at 07:30 Aljoscha Krettek <[hidden email]> wrote:
I think you can work on it. By the way, there are actually two serializers. For Scala, CaseClassSerializer is responsible for tuples as well. In Java, TupleSerializer is responsible for, well, Tuples. 

On Tue, 16 Jun 2015 at 06:25 Shiti Saxena <[hidden email]> wrote:
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the reason why this doesn't work is that the TupleSerializer cannot deal with null values:
@Test
def testAggregationWithNull(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val table = env.fromElements[(Integer, String)](
(123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

val total = table.select('_1.sum).collect().head.productElement(0)
assertEquals(total, 702)
}
it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me.
Cheers,
Aljoscha

On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <[hidden email]> wrote:
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti










Reply | Threaded
Open this post in threaded view
|

Re: Help with Flink experimental Table API

Aljoscha Krettek
Yes, what I meant was to have a single bit mask that is written before all the fields are written. Then, for example, 1011 would mean that field 1, 2, and 4 are non-null while field 3 is null.

On Tue, 16 Jun 2015 at 10:24 Shiti Saxena <[hidden email]> wrote:
Can we use 0(false) and 1(true)?

On Tue, Jun 16, 2015 at 1:32 PM, Aljoscha Krettek <[hidden email]> wrote:
One more thing, it would be good if the TupleSerializer didn't write a boolean for every field. A single integer could be used where one bit specifies if a given field is null or not. (Maybe we should also add this to the RowSerializer in the future.)

On Tue, 16 Jun 2015 at 07:30 Aljoscha Krettek <[hidden email]> wrote:
I think you can work on it. By the way, there are actually two serializers. For Scala, CaseClassSerializer is responsible for tuples as well. In Java, TupleSerializer is responsible for, well, Tuples. 

On Tue, 16 Jun 2015 at 06:25 Shiti Saxena <[hidden email]> wrote:
Hi,

Can I work on the issue with TupleSerializer or is someone working on it?

On Mon, Jun 15, 2015 at 11:20 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the reason why this doesn't work is that the TupleSerializer cannot deal with null values:
@Test
def testAggregationWithNull(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val table = env.fromElements[(Integer, String)](
(123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

val total = table.select('_1.sum).collect().head.productElement(0)
assertEquals(total, 702)
}
it would have to modified in a similar way to the PojoSerializer and RowSerializer. You could either leave the tests as they are now in you pull request or also modify the TupleSerializer. Both seem fine to me.
Cheers,
Aljoscha

On Sun, 14 Jun 2015 at 20:28 Shiti Saxena <[hidden email]> wrote:
Hi,

Re-writing the test in the following manner works. But I am not sure if this is the correct way.

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val dataSet = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (0, "d"))

    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))

    val rowDataSet = dataSet.map {
      entry =>
        val row = new Row(2)
        val amount = if(entry._1<100) null else entry._1
        row.setField(0, amount)
        row.setField(1, entry._2)
        row
    }
    
    val total = rowDataSet.toTable.select('id.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }



On Sun, Jun 14, 2015 at 11:42 PM, Shiti Saxena <[hidden email]> wrote:
Hi,

For 

val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I get the following error,

Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
org.apache.flink.optimizer.CompilerException: Error translating node 'Data Source "at org.apache.flink.api.scala.ExecutionEnvironment.fromElements(ExecutionEnvironment.scala:505) (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': null
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103)
at org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:87)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170)
at org.apache.flink.test.util.TestEnvironment.execute(TestEnvironment.java:52)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:89)
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:29)
at org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268)
at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:853)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:260)
... 55 more


Does this mean that the collect method is being called before doing the aggregation? Is this because base serializers do not handle null values like POJOSerializer? And is that why fromCollection does not support collections with null values?

Or I could write the test using a file load if thats alright.


On Sun, Jun 14, 2015 at 11:11 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
sorry, my mail client sent before I was done.

I think the problem is that the Scala compiler derives a wrong type for this statement:
val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

Because of the null value it derives (Any, String) as the type if you do it like this, I think it should work:
val table = env.fromElements[(Integer, String)]((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

I used Integer instead of Int because Scala will complain that null is not a valid value for Int otherwise.

Cheers,
Aljoscha


On Sun, 14 Jun 2015 at 19:34 Aljoscha Krettek <[hidden email]> wrote:
Hi,
I think the problem is that the Scala compiler derives a wrong type for this statement:



On Sun, 14 Jun 2015 at 18:28 Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

I created the issue FLINK-2210 for aggregate on null. I made changes to ExpressionAggregateFunction to handle ignore null values. But I am unable to create a Table with null values in tests.
 
The code I used is,

def testAggregationWithNull(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val table = env.fromElements((123, "a"), (234, "b"), (345, "c"), (null, "d")).toTable

    val total = table.select('_1.sum).collect().head.productElement(0)
    assertEquals(total, 702)
  }

and the error i get is,

org.apache.flink.api.table.ExpressionException: Invalid expression "('_1).sum": Unsupported type GenericType<java.lang.Object> for aggregation ('_1).sum. Only numeric data types supported.
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:50)
at org.apache.flink.api.table.expressions.analysis.TypeCheck.apply(TypeCheck.scala:31)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:34)
at org.apache.flink.api.table.trees.Analyzer$$anonfun$analyze$1.apply(Analyzer.scala:31)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.flink.api.table.trees.Analyzer.analyze(Analyzer.scala:31)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at org.apache.flink.api.table.Table$$anonfun$1.apply(Table.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.api.table.Table.select(Table.scala:59)
at org.apache.flink.api.scala.table.test.AggregationsITCase.testAggregationWithNull(AggregationsITCase.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:78)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:212)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


The ExecutionEnvironment.fromCollection method also throws an error when the collection contains a null.

Could you please point out what I am doing wrong? How do we create a Table with null values?

In our application, we load a file and transform each line into a Row resulting in a DataSet[Row]. This DataSet[Row] is then converted into Table. Should I use the same approach for the test case?


Thanks,
Shiti









On Sun, Jun 14, 2015 at 4:10 PM, Shiti Saxena <[hidden email]> wrote:
I'll do the fix

On Sun, Jun 14, 2015 at 12:42 AM, Aljoscha Krettek <[hidden email]> wrote:
I merged your PR for the RowSerializer. Teaching the aggregators to deal with null values should be a very simple fix in ExpressionAggregateFunction.scala. There it is simply always aggregating the values without checking whether they are null. If you want you can also fix that or I can quickly fix it.

On Thu, 11 Jun 2015 at 10:40 Aljoscha Krettek <[hidden email]> wrote:
Cool, good to hear.

The PojoSerializer already handles null fields. The RowSerializer can be modified in pretty much the same way. So you should start by looking at the copy()/serialize()/deserialize() methods of PojoSerializer and then modify RowSerializer in a similar way.

You can also send me a private mail if you want more in-depth explanations. 

On Thu, 11 Jun 2015 at 09:33 Till Rohrmann <[hidden email]> wrote:
Hi Shiti,

here is the issue [1].

Cheers,
Till


On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <[hidden email]> wrote:
Hi Aljoscha,

Could you please point me to the JIRA tickets? If you could provide some guidance on how to resolve these, I will work on them and raise a pull-request.

Thanks,
Shiti

On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes, I think the problem is that the RowSerializer does not support null-values. I think we can add support for this, I will open a Jira issue.

Another problem I then see is that the aggregations can not properly deal with null-values. This would need separate support.

Regards,
Aljoscha

On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <[hidden email]> wrote:
Hi,

In our project, we are using the Flink Table API and are facing the following issues,

We load data from a CSV file and create a DataSet[Row]. The CSV file can also have invalid entries in some of the fields which we replace with null when building the DataSet[Row].

This DataSet[Row] is later on transformed to Table whenever required and specific operation such as select or aggregate, etc are performed.

When a null value is encountered, we get a null pointer exception and the whole job fails. (We can see this by calling collect on the resulting DataSet).

The error message is similar to,

Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
at org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
at org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:724)

Could this be because the RowSerializer does not support null values? (Similar to Flink-629 )

Currently, to overcome this issue, we are ignoring all the rows which may have null values. For example, we have a method cleanData defined as,

def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
    val whereClause: String = relevantColumns.map{
        cName=>
            s"$cName.isNotNull"
    }.mkString(" && ")

    val result :Table = table.select(relevantColumns.mkString(",")).where(whereClause)
    result
}

Before operating on any Table, we use this method and then continue with task.

Is this the right way to handle this? If not please let me know how to go about it. 


Thanks,
Shiti