Change to StreamingFileSink in Flink 1.10

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Change to StreamingFileSink in Flink 1.10

Averell
Hi,

I have the following code:
 /           StreamingFileSink
              .forRowFormat(new Path(path), myEncoder)
              .withRollingPolicy(DefaultRollingPolicy.create().build())
              .withBucketAssigner(myBucketAssigner)
              .build()/
This is working fine in Flink 1.8.3. However, when I try to compile with
Flink 1.10.0, I got the following error:
    / value build is not a member of ?0
    possible cause: maybe a semicolon is missing before `value build'?/

As per the hint from IntelliJ,
/.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
.withRollingPolicy(...) returns a RowFormatBuilder[_]
.withBucketAssigner(...) returns Any/

I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried with/without
IntelliJ, no difference.

Not sure/understand what's wrong

Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Sivaprasanna
Hi Averell,

Can you please the complete stacktrace of the error?

On Mon, Apr 20, 2020 at 4:48 PM Averell <[hidden email]> wrote:
Hi,

I have the following code:
 /           StreamingFileSink
              .forRowFormat(new Path(path), myEncoder)
              .withRollingPolicy(DefaultRollingPolicy.create().build())
              .withBucketAssigner(myBucketAssigner)
              .build()/
This is working fine in Flink 1.8.3. However, when I try to compile with
Flink 1.10.0, I got the following error:
    / value build is not a member of ?0
    possible cause: maybe a semicolon is missing before `value build'?/

As per the hint from IntelliJ,
/.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
.withRollingPolicy(...) returns a RowFormatBuilder[_]
.withBucketAssigner(...) returns Any/

I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried with/without
IntelliJ, no difference.

Not sure/understand what's wrong

Thanks!
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Averell
Hi Sivaprasanna,

That is a compile-time error, not a runtime error.

/value build is not a member of ?0
possible cause: maybe a semicolon is missing before `value build'?/.

There won't be any issue with either *withRollingPolicy*() or
/withBucketAssigner/(), but not both.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Averell
Hi,

I tried to add the following cast, and it works. Doesn't look nice though

/            StreamingFileSink
              .forRowFormat(new Path(path), myEncoder)
              .withRollingPolicy(DefaultRollingPolicy.create().build())
             
.withBucketAssigner(myBucketAssigner)*.asInstanceOf[RowFormatBuilder[IN,
String, _]]*
              .build()/

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Leonard Xu
Hi, Averell

I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner`  and may cause by generics type 
that your Encoder<IN>’s  element type(IN) does not match BucketAssigner<IN, BuketId> element type(IN) or
you lost the generics type information when instantiate them.

Could you post more code phase? 

Best,
Leonard Xu.

在 2020年4月21日,07:47,Averell <[hidden email]> 写道:

myEncoder

Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Sivaprasanna
I agree with Leonard. I have just tried the same in Scala 2.11 with Flink 1.10.0 and it works just fine.

Cheers,
Sivaprasanna

On Tue, Apr 21, 2020 at 12:53 PM Leonard Xu <[hidden email]> wrote:
Hi, Averell

I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner`  and may cause by generics type 
that your Encoder<IN>’s  element type(IN) does not match BucketAssigner<IN, BuketId> element type(IN) or
you lost the generics type information when instantiate them.

Could you post more code phase? 

Best,
Leonard Xu.

在 2020年4月21日,07:47,Averell <[hidden email]> 写道:

myEncoder

Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Averell
Hello Leonard, Sivaprasanna,

But my code was working fine with Flink v1.8.
I also tried with a simple String DataStream, and got the same error.
/        StreamingFileSink
          .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
          .withRollingPolicy(DefaultRollingPolicy.builder().build())
          .withBucketAssigner(new DateTimeBucketAssigner)
          .build()/
(screenshot below)
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkError.png>

It's weird. At first I thought it's something wrong with IntelliJ, but I got
the same error when running mvn from commandline.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Leonard Xu
Hi, Averell

I found you’re using scala so I reproduced your case local in Scala 2.11.12 with Flink 1.10.0 and it works too.
From your picture it’s wired  that line`.withBucketAssigner(new DateTimeBucketAssigner)` hint is `Any`,
it should be `RowFormatBuilder` otherwise you can not call `#build` function from an `Any` object.
You can debug this line and to see what happened.


Best,
Leonard



> 在 2020年4月21日,17:47,Averell <[hidden email]> 写道:
>
> Hello Leonard, Sivaprasanna,
>
> But my code was working fine with Flink v1.8.
> I also tried with a simple String DataStream, and got the same error.
> /        StreamingFileSink
>          .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
>          .withRollingPolicy(DefaultRollingPolicy.builder().build())
>          .withBucketAssigner(new DateTimeBucketAssigner)
>          .build()/
> (screenshot below)
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkError.png>
>
> It's weird. At first I thought it's something wrong with IntelliJ, but I got
> the same error when running mvn from commandline.
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Seth Wiesman
Hi All,

There is a bug in the builder that prevents it from compiling in scala due to differences in type inference between java and scala[1]. It as already been resolved for 1.10.1 and 1.11. In the meantime, just go ahead and use casts or construct the object in a java class.

Seth


On Tue, Apr 21, 2020 at 7:33 AM Leonard Xu <[hidden email]> wrote:
Hi, Averell

I found you’re using scala so I reproduced your case local in Scala 2.11.12 with Flink 1.10.0 and it works too.
From your picture it’s wired  that line`.withBucketAssigner(new DateTimeBucketAssigner)` hint is `Any`,
it should be `RowFormatBuilder` otherwise you can not call `#build` function from an `Any` object.
You can debug this line and to see what happened.


Best,
Leonard



> 在 2020年4月21日,17:47,Averell <[hidden email]> 写道:
>
> Hello Leonard, Sivaprasanna,
>
> But my code was working fine with Flink v1.8.
> I also tried with a simple String DataStream, and got the same error.
> /        StreamingFileSink
>          .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
>          .withRollingPolicy(DefaultRollingPolicy.builder().build())
>          .withBucketAssigner(new DateTimeBucketAssigner)
>          .build()/
> (screenshot below)
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkError.png>
>
> It's weird. At first I thought it's something wrong with IntelliJ, but I got
> the same error when running mvn from commandline.
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Leonard Xu
Thanks @Seth Wiesman
Ah,I just found I used 1.10-SNAPSHOT locally so can not reproduce the bug.
@Averell you can use casts first and wait for 1.10.1 version, 1.10.1 will release soon.

Best,
Leonard


在 2020年4月21日,22:03,Seth Wiesman <[hidden email]> 写道:

Hi All,

There is a bug in the builder that prevents it from compiling in scala due to differences in type inference between java and scala[1]. It as already been resolved for 1.10.1 and 1.11. In the meantime, just go ahead and use casts or construct the object in a java class.

Seth


On Tue, Apr 21, 2020 at 7:33 AM Leonard Xu <[hidden email]> wrote:
Hi, Averell

I found you’re using scala so I reproduced your case local in Scala 2.11.12 with Flink 1.10.0 and it works too.
From your picture it’s wired  that line`.withBucketAssigner(new DateTimeBucketAssigner)` hint is `Any`,
it should be `RowFormatBuilder` otherwise you can not call `#build` function from an `Any` object.
You can debug this line and to see what happened.


Best,
Leonard



> 在 2020年4月21日,17:47,Averell <[hidden email]> 写道:
>
> Hello Leonard, Sivaprasanna,
>
> But my code was working fine with Flink v1.8.
> I also tried with a simple String DataStream, and got the same error.
> /        StreamingFileSink
>          .forRowFormat(new Path(path), new SimpleStringEncoder[String]())
>          .withRollingPolicy(DefaultRollingPolicy.builder().build())
>          .withBucketAssigner(new DateTimeBucketAssigner)
>          .build()/
> (screenshot below)
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/FlinkError.png>
>
> It's weird. At first I thought it's something wrong with IntelliJ, but I got
> the same error when running mvn from commandline.
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: Change to StreamingFileSink in Flink 1.10

Averell
In reply to this post by Seth Wiesman
Thanks @Seth Wiesman and all.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/