Question about state processor data outputs

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Question about state processor data outputs

Chen-Che Huang
Hi all,

We're going to use state processor to make our keyedstate data to be written to different files based on the keys. More specifically, we want our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where the value with the same key is stored in the same file. In each file, the data may be stored as follows. As far as I know, I need to implement my own Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to meet the requirement. However, I wonder is there a native way to achieve this without implementing my own Sink because using official solution is usually more efficient and reliable than doing it by myself.  Many thanks for any comment.

key1.txt
key1 value11
key1 value21
key1 value31

key2.txt
key2 value21
key2 value22
key2 value23

Best wishes,
Chen-Che Huang
Reply | Threaded
Open this post in threaded view
|

Re: Question about state processor data outputs

rmetzger0
Hey Chen-Che Huang,

I guess the StreamingFileSink is what you are looking for. It is documented here:  https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
I drafted a short example (that is not production ready), which does roughly what you are asking for:

Hope this helps!

Best,
Robert


On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang <[hidden email]> wrote:
Hi all,

We're going to use state processor to make our keyedstate data to be written to different files based on the keys. More specifically, we want our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where the value with the same key is stored in the same file. In each file, the data may be stored as follows. As far as I know, I need to implement my own Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to meet the requirement. However, I wonder is there a native way to achieve this without implementing my own Sink because using official solution is usually more efficient and reliable than doing it by myself.  Many thanks for any comment.

key1.txt
key1 value11
key1 value21
key1 value31

key2.txt
key2 value21
key2 value22
key2 value23

Best wishes,
Chen-Che Huang
Reply | Threaded
Open this post in threaded view
|

Re: Question about state processor data outputs

Chen-Che Huang
Hi Robert,

Thanks for your code. It's really helpful!

However, with the readKeyedState api of state processor, we get dataset for our data instead of datastream and it seems the dataset doesn't support streamfilesink (not addSink method like datastream). If not, I need to transform the dataset to a datastream. I'm not sure it's doable based on https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628. If it's doable, then I'll be able to solve our problem with applying streamfilesink to the transformed dataset.

Best wishes,
Chen-Che Huang

On 2021/04/15 19:23:43, Robert Metzger <[hidden email]> wrote:

> Hey Chen-Che Huang,
>
> I guess the StreamingFileSink is what you are looking for. It is documented
> here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> I drafted a short example (that is not production ready), which does
> roughly what you are asking for:
> https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
>
> Hope this helps!
>
> Best,
> Robert
>
>
> On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang <[hidden email]> wrote:
>
> > Hi all,
> >
> > We're going to use state processor to make our keyedstate data to be
> > written to different files based on the keys. More specifically, we want
> > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where
> > the value with the same key is stored in the same file. In each file, the
> > data may be stored as follows. As far as I know, I need to implement my own
> > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to
> > meet the requirement. However, I wonder is there a native way to achieve
> > this without implementing my own Sink because using official solution is
> > usually more efficient and reliable than doing it by myself.  Many thanks
> > for any comment.
> >
> > key1.txt
> > key1 value11
> > key1 value21
> > key1 value31
> >
> > key2.txt
> > key2 value21
> > key2 value22
> > key2 value23
> >
> > Best wishes,
> > Chen-Che Huang
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about state processor data outputs

rmetzger0
Hi,
I assumed you are using the DataStream API, because you mentioned the streaming sink. But you also mentioned the state processor API (which I ignored a bit).

I wonder why you are using the state processor API. Can't you use the streaming job that created the state also for writing it to files using the StreamingFileSink?

If you want to stick to the DataSet API, then I guess you have to implement a custom (File)OutputFormat.


On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang <[hidden email]> wrote:
Hi Robert,

Thanks for your code. It's really helpful!

However, with the readKeyedState api of state processor, we get dataset for our data instead of datastream and it seems the dataset doesn't support streamfilesink (not addSink method like datastream). If not, I need to transform the dataset to a datastream. I'm not sure it's doable based on https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628. If it's doable, then I'll be able to solve our problem with applying streamfilesink to the transformed dataset.

Best wishes,
Chen-Che Huang

On 2021/04/15 19:23:43, Robert Metzger <[hidden email]> wrote:
> Hey Chen-Che Huang,
>
> I guess the StreamingFileSink is what you are looking for. It is documented
> here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> I drafted a short example (that is not production ready), which does
> roughly what you are asking for:
> https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
>
> Hope this helps!
>
> Best,
> Robert
>
>
> On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang <[hidden email]> wrote:
>
> > Hi all,
> >
> > We're going to use state processor to make our keyedstate data to be
> > written to different files based on the keys. More specifically, we want
> > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where
> > the value with the same key is stored in the same file. In each file, the
> > data may be stored as follows. As far as I know, I need to implement my own
> > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to
> > meet the requirement. However, I wonder is there a native way to achieve
> > this without implementing my own Sink because using official solution is
> > usually more efficient and reliable than doing it by myself.  Many thanks
> > for any comment.
> >
> > key1.txt
> > key1 value11
> > key1 value21
> > key1 value31
> >
> > key2.txt
> > key2 value21
> > key2 value22
> > key2 value23
> >
> > Best wishes,
> > Chen-Che Huang
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about state processor data outputs

Chen-Che Huang
Hi Robert,

Due to some concerns, we planned to use state processor to achieve our goal. Now we will consider to reevaluate using datastream to do the job while exploring the possibility of implementing a custom FileOutputFormat. Thanks for your comments!

Best wishes,
Chen-Che Huang

On 2021/04/16 06:53:37, Robert Metzger <[hidden email]> wrote:

> Hi,
> I assumed you are using the DataStream API, because you mentioned the
> streaming sink. But you also mentioned the state processor API (which I
> ignored a bit).
>
> I wonder why you are using the state processor API. Can't you use the
> streaming job that created the state also for writing it to files using the
> StreamingFileSink?
>
> If you want to stick to the DataSet API, then I guess you have to implement
> a custom (File)OutputFormat.
>
>
> On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang <[hidden email]> wrote:
>
> > Hi Robert,
> >
> > Thanks for your code. It's really helpful!
> >
> > However, with the readKeyedState api of state processor, we get dataset
> > for our data instead of datastream and it seems the dataset doesn't support
> > streamfilesink (not addSink method like datastream). If not, I need to
> > transform the dataset to a datastream. I'm not sure it's doable based on
> > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> > If it's doable, then I'll be able to solve our problem with applying
> > streamfilesink to the transformed dataset.
> >
> > Best wishes,
> > Chen-Che Huang
> >
> > On 2021/04/15 19:23:43, Robert Metzger <[hidden email]> wrote:
> > > Hey Chen-Che Huang,
> > >
> > > I guess the StreamingFileSink is what you are looking for. It is
> > documented
> > > here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > I drafted a short example (that is not production ready), which does
> > > roughly what you are asking for:
> > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> > >
> > > Hope this helps!
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang <[hidden email]>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > We're going to use state processor to make our keyedstate data to be
> > > > written to different files based on the keys. More specifically, we
> > want
> > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> > where
> > > > the value with the same key is stored in the same file. In each file,
> > the
> > > > data may be stored as follows. As far as I know, I need to implement
> > my own
> > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> > to
> > > > meet the requirement. However, I wonder is there a native way to
> > achieve
> > > > this without implementing my own Sink because using official solution
> > is
> > > > usually more efficient and reliable than doing it by myself.  Many
> > thanks
> > > > for any comment.
> > > >
> > > > key1.txt
> > > > key1 value11
> > > > key1 value21
> > > > key1 value31
> > > >
> > > > key2.txt
> > > > key2 value21
> > > > key2 value22
> > > > key2 value23
> > > >
> > > > Best wishes,
> > > > Chen-Che Huang
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Question about state processor data outputs

Chen-Che Huang
In reply to this post by rmetzger0
Hi Robert,

Due to the performance issue of using state processor, I probably would like to give up state processor and am trying StreamingFileSink in a streaming manner. However, I need to store the files on GCS. However, I encountered the error below. It looks like Flink hasn't support GCS for StreamingFileSink (https://issues.apache.org/jira/browse/FLINK-11838). If you know any solution to this issue, please let me know. Thanks.
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
        at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:60)

Best regards,
Chen-Che

On 2021/04/16 06:53:37, Robert Metzger <[hidden email]> wrote:

> Hi,
> I assumed you are using the DataStream API, because you mentioned the
> streaming sink. But you also mentioned the state processor API (which I
> ignored a bit).
>
> I wonder why you are using the state processor API. Can't you use the
> streaming job that created the state also for writing it to files using the
> StreamingFileSink?
>
> If you want to stick to the DataSet API, then I guess you have to implement
> a custom (File)OutputFormat.
>
>
> On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang <[hidden email]> wrote:
>
> > Hi Robert,
> >
> > Thanks for your code. It's really helpful!
> >
> > However, with the readKeyedState api of state processor, we get dataset
> > for our data instead of datastream and it seems the dataset doesn't support
> > streamfilesink (not addSink method like datastream). If not, I need to
> > transform the dataset to a datastream. I'm not sure it's doable based on
> > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> > If it's doable, then I'll be able to solve our problem with applying
> > streamfilesink to the transformed dataset.
> >
> > Best wishes,
> > Chen-Che Huang
> >
> > On 2021/04/15 19:23:43, Robert Metzger <[hidden email]> wrote:
> > > Hey Chen-Che Huang,
> > >
> > > I guess the StreamingFileSink is what you are looking for. It is
> > documented
> > > here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > I drafted a short example (that is not production ready), which does
> > > roughly what you are asking for:
> > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> > >
> > > Hope this helps!
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang <[hidden email]>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > We're going to use state processor to make our keyedstate data to be
> > > > written to different files based on the keys. More specifically, we
> > want
> > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> > where
> > > > the value with the same key is stored in the same file. In each file,
> > the
> > > > data may be stored as follows. As far as I know, I need to implement
> > my own
> > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> > to
> > > > meet the requirement. However, I wonder is there a native way to
> > achieve
> > > > this without implementing my own Sink because using official solution
> > is
> > > > usually more efficient and reliable than doing it by myself.  Many
> > thanks
> > > > for any comment.
> > > >
> > > > key1.txt
> > > > key1 value11
> > > > key1 value21
> > > > key1 value31
> > > >
> > > > key2.txt
> > > > key2 value21
> > > > key2 value22
> > > > key2 value23
> > > >
> > > > Best wishes,
> > > > Chen-Che Huang
> > > >
> > >
> >
>