IT's with POJO's

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

IT's with POJO's

Nick Dimiduk
Heya,

I'm writing my first flink streaming application and have a flow that
passes type checking and complies. I've written a simple end-to-end
test with junit, using StreamExecutionEnvironment#fromElements() to
provide a stream if valid and invalid test objects; the objects are
POJOs.

It seems I'm running into FLINK-2124. Is there a work-around I can use
with 0.9.1 release, perhaps another API through which I can pass in
the type information explicitly? Variants of #fromCollection also fail
with the same error, and I've tried registering my POJO explicitly
with StreamExecutionEnvironment#registerType, also to no effect.

Apologies to posting both on the JIRA and here; I'm new to the
community and don't know which means is most expedient for user
questions.

Thanks,
Nick
Reply | Threaded
Open this post in threaded view
|

Re: IT's with POJO's

rmetzger0
Hi Nick,

both JIRA and the mailing list are good. In this case I'd say JIRA would be better because then everybody has the full context of the discussion.

The issue is fixed in 0.10, which is not yet released. 

You can work around the issue by implementing a custom SourceFunction which returns the POJOs.

Here's an example of a source function which returns just integers.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> src = see.addSource(new RichParallelSourceFunction<Integer>() {

boolean running = true;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int i = 0;
while (running) {
ctx.collect(i++);
}
}

@Override
public void cancel() {
running = false;
}
});

Let me know if you need further advice.
Robert

On Wed, Nov 4, 2015 at 6:59 PM, Nick Dimiduk <[hidden email]> wrote:
Heya,

I'm writing my first flink streaming application and have a flow that
passes type checking and complies. I've written a simple end-to-end
test with junit, using StreamExecutionEnvironment#fromElements() to
provide a stream if valid and invalid test objects; the objects are
POJOs.

It seems I'm running into FLINK-2124. Is there a work-around I can use
with 0.9.1 release, perhaps another API through which I can pass in
the type information explicitly? Variants of #fromCollection also fail
with the same error, and I've tried registering my POJO explicitly
with StreamExecutionEnvironment#registerType, also to no effect.

Apologies to posting both on the JIRA and here; I'm new to the
community and don't know which means is most expedient for user
questions.

Thanks,
Nick

Reply | Threaded
Open this post in threaded view
|

Re: IT's with POJO's

Nick Dimiduk
Thank you Robert, I'll give this a spin -- obvious now that you point
it out. I'll go ahead and continue the line on inquiry on the JIRA.

-n

On Thu, Nov 5, 2015 at 4:18 AM, Robert Metzger <[hidden email]> wrote:

> Hi Nick,
>
> both JIRA and the mailing list are good. In this case I'd say JIRA would be
> better because then everybody has the full context of the discussion.
>
> The issue is fixed in 0.10, which is not yet released.
>
> You can work around the issue by implementing a custom SourceFunction which
> returns the POJOs.
>
> Here's an example of a source function which returns just integers.
>
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStreamSource<Integer> src = see.addSource(new
> RichParallelSourceFunction<Integer>() {
>
>    boolean running = true;
>    @Override
>    public void run(SourceContext<Integer> ctx) throws Exception {
>       int i = 0;
>       while (running) {
>          ctx.collect(i++);
>       }
>    }
>
>    @Override
>    public void cancel() {
>       running = false;
>    }
> });
>
>
> Let me know if you need further advice.
>
> Robert
>
>
> On Wed, Nov 4, 2015 at 6:59 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Heya,
>>
>> I'm writing my first flink streaming application and have a flow that
>> passes type checking and complies. I've written a simple end-to-end
>> test with junit, using StreamExecutionEnvironment#fromElements() to
>> provide a stream if valid and invalid test objects; the objects are
>> POJOs.
>>
>> It seems I'm running into FLINK-2124. Is there a work-around I can use
>> with 0.9.1 release, perhaps another API through which I can pass in
>> the type information explicitly? Variants of #fromCollection also fail
>> with the same error, and I've tried registering my POJO explicitly
>> with StreamExecutionEnvironment#registerType, also to no effect.
>>
>> Apologies to posting both on the JIRA and here; I'm new to the
>> community and don't know which means is most expedient for user
>> questions.
>>
>> Thanks,
>> Nick
>
>