Working with DataStreams of Java objects in Pyflink?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Working with DataStreams of Java objects in Pyflink?

Kevin Lam
Hi all,

Looking to use Pyflink to work with some scala-defined objects being emitted from a custom source. When trying to manipulate the objects in a pyflink defined MapFunction, I'm hitting an error like:

Caused by: java.lang.UnsupportedOperationException: The type information: Option[<...>$Record(id: Long, created_at: Option[Long], updated_at: Option[Long])] is not supported in PyFlink currently.

The scala object is defined something like:

```
object <...> {
  case class Record(
    id: Long,
    created_at: Option[Long],
    updated_at: Option[Long],
    ...
  )
}
```

The pyflink code is something like:

```
class Mutate(MapFunction):
  def map(self,value):
    print(value.id)
    value.id = 123 

...

records = env.add_source(..)
records = records.map(Mutate()
```

Can you provide any advice on how to work with these kinds of objects in Pyflink? 

Thanks in advance! 
Reply | Threaded
Open this post in threaded view
|

Re: Working with DataStreams of Java objects in Pyflink?

Shuiqiang Chen
Hi Kevin, 

Currently, POJO type is not supported in Python DataStream API because it is hard to deal with the conversion between Python Objects and Java Objects. Maybe you can use a RowType to represent the POJO class such as Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(), Types.LONG()]). We will try to support the POJO type in the future.

Best,
Shuiqiang

Kevin Lam <[hidden email]> 于2021年3月15日周一 下午10:46写道:
Hi all,

Looking to use Pyflink to work with some scala-defined objects being emitted from a custom source. When trying to manipulate the objects in a pyflink defined MapFunction, I'm hitting an error like:

Caused by: java.lang.UnsupportedOperationException: The type information: Option[<...>$Record(id: Long, created_at: Option[Long], updated_at: Option[Long])] is not supported in PyFlink currently.

The scala object is defined something like:

```
object <...> {
  case class Record(
    id: Long,
    created_at: Option[Long],
    updated_at: Option[Long],
    ...
  )
}
```

The pyflink code is something like:

```
class Mutate(MapFunction):
  def map(self,value):
    print(value.id)
    value.id = 123 

...

records = env.add_source(..)
records = records.map(Mutate()
```

Can you provide any advice on how to work with these kinds of objects in Pyflink? 

Thanks in advance! 
Reply | Threaded
Open this post in threaded view
|

Re: Working with DataStreams of Java objects in Pyflink?

Kevin Lam
Hi Shuiqiang Chen,

Thanks for the quick response. Oh I see, that's too bad POJO is not currently supported.

I'd like to check if I understand your suggestion about RowType. You're suggesting something like: 

1/ Define subclasses of RowType in Java/Scala to hold our java objects we want to manipulate in Python.
2/ When datastreams/sources emit objects of this type in pyflink, we can mutate and read from these java defined RowTypes as needed, because Python doesn't know how to handle arbitrary POJOs, but knows how to handle RowType objects. 

Is that correct? A simple example of extending/using RowType would be helpful if you have a chance.

Thanks again for all your help, here and in the other threads on this mailing list, really appreciate it!!

On Mon, Mar 15, 2021 at 11:59 AM Shuiqiang Chen <[hidden email]> wrote:
Hi Kevin, 

Currently, POJO type is not supported in Python DataStream API because it is hard to deal with the conversion between Python Objects and Java Objects. Maybe you can use a RowType to represent the POJO class such as Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(), Types.LONG()]). We will try to support the POJO type in the future.

Best,
Shuiqiang

Kevin Lam <[hidden email]> 于2021年3月15日周一 下午10:46写道:
Hi all,

Looking to use Pyflink to work with some scala-defined objects being emitted from a custom source. When trying to manipulate the objects in a pyflink defined MapFunction, I'm hitting an error like:

Caused by: java.lang.UnsupportedOperationException: The type information: Option[<...>$Record(id: Long, created_at: Option[Long], updated_at: Option[Long])] is not supported in PyFlink currently.

The scala object is defined something like:

```
object <...> {
  case class Record(
    id: Long,
    created_at: Option[Long],
    updated_at: Option[Long],
    ...
  )
}
```

The pyflink code is something like:

```
class Mutate(MapFunction):
  def map(self,value):
    print(value.id)
    value.id = 123 

...

records = env.add_source(..)
records = records.map(Mutate()
```

Can you provide any advice on how to work with these kinds of objects in Pyflink? 

Thanks in advance!