Hi,
In our flink pipeline we are currently writing the data to multiple S3 objects/folders based on some conditions, so the issue I am facing is as follows : Consider these S3 folders : temp_bucket/processedData/20160823/ temp_bucket/rawData/20160822/ temp_bucket/errorData/20160821/ Now when the parallelism is set to 1, the data gets written to all S3 folders above, but when I set it to larger value the data is written only to the first folder and not the others. I am testing the flink job on EMR with 4 task managers having 16 slots, even if I keep parallelism as 4 , I am facing the same issue. (running from IDE is resulting in same output, Tested this with Flink 1.0.3 and 1.1.1) I am not understanding why this is happening. Regards, Vinay Patil |
Hi Vinay,
Does this only happen with the S3 file system or also with your local file system? Could you share some example code or log output of your running job? Best, Max On Wed, Aug 24, 2016 at 4:20 AM, Vinay Patil <[hidden email]> wrote: > Hi, > > In our flink pipeline we are currently writing the data to multiple S3 > objects/folders based on some conditions, so the issue I am facing is as > follows : > > Consider these S3 folders : > temp_bucket/processedData/20160823/ > temp_bucket/rawData/20160822/ > temp_bucket/errorData/20160821/ > > Now when the parallelism is set to 1, the data gets written to all S3 > folders above, but when I set it to larger value the data is written only to > the first folder and not the others. > > I am testing the flink job on EMR with 4 task managers having 16 slots, even > if I keep parallelism as 4 , I am facing the same issue. > (running from IDE is resulting in same output, Tested this with Flink 1.0.3 > and 1.1.1) > > I am not understanding why this is happening. > > > Regards, > Vinay Patil |
Hi Max,
I tried writing to local file as well, its giving me the same issue, I have attached the logs and dummy pipeline code.logs.txtdummy_pipeline.txt |
Hi, Just an update, the window is not getting triggered when I change the parallelism to more than 1. Can you please explain why this is happening ? Regards, Vinay Patil On Wed, Aug 24, 2016 at 9:55 AM, vinay patil [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: Hi Max, |
I'm assuming there is something wrong with your Watermark/Timestamp
assigner. Could you share some of the code? On Wed, Aug 24, 2016 at 9:54 PM, vinay patil <[hidden email]> wrote: > Hi, > > Just an update, the window is not getting triggered when I change the > parallelism to more than 1. > > Can you please explain why this is happening ? > > Regards, > Vinay Patil > > On Wed, Aug 24, 2016 at 9:55 AM, vinay patil [via Apache Flink User Mailing > List archive.] <[hidden email]> wrote: >> >> Hi Max, >> >> I tried writing to local file as well, its giving me the same issue, I >> have attached the logs and dummy pipeline code.logs.txtdummy_pipeline.txt >> >> ________________________________ >> If you reply to this email, your message will be added to the discussion >> below: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dealing-with-Multiple-sinks-in-Flink-tp8643p8664.html >> To start a new topic under Apache Flink User Mailing List archive., email >> [hidden email] >> To unsubscribe from Apache Flink User Mailing List archive., click here. >> NAML > > > > ________________________________ > View this message in context: Re: Dealing with Multiple sinks in Flink > > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. |
Hi Max, Here is the code for Timestamp assigner and watermark generation. PFA Regards, Vinay Patil On Thu, Aug 25, 2016 at 7:39 AM, Maximilian Michels [via Apache Flink User Mailing List archive.] <[hidden email]> wrote: I'm assuming there is something wrong with your Watermark/Timestamp MyTimestampExtractor.java (4K) Download Attachment |
Flink 1.1.1 has a metric for exposing the low watermark of each operator. Maybe you can access a TaskManager via JMX to see the value of the WM. Watermarks are sometimes a bit tricky. On Thu, Aug 25, 2016 at 4:29 PM, vinay patil <[hidden email]> wrote:
|
Thanks Robert will try that, can you provide more details on how to integrate that. I had sysouts to check if the watermarks are getting generated, I am getting the values, but as I said the window is not getting triggered for parallelism greater than 1, I have tried using AscendingTimeStampExtractor but facing the same issue. Regards, Vinay Patil On Thu, Aug 25, 2016 at 1:32 PM, rmetzger0 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
I would first try to understand the metrics system locally, for example using the VisualVM tool, that also allows you to access JMX-exported metrics. Once you've seen how it works, you can look into remote JMX access. There's a page in the Flink documentation about the metrics. On Thu, Aug 25, 2016 at 8:36 PM, vinay patil <[hidden email]> wrote:
|
Hi Robert,
I had resolved this issue earlier as I had not set the Kafka source parallelism to number of partitions, so I was getting the issue of window not getting triggered. Now I am facing the same issue, I tried to check the watermark value by using visualVM locally but I am not seeing that value there, I have attached the snapahot of visualVM Just to verify , JMX port runs on 9010 by default , right ?, because when I tried to connect to it locally, I could not connect |
Hi Vinay, the JMX port depends on the port you've configured for the JMX metrics reporter. Did you configure it? Regards, Robert On Fri, Dec 2, 2016 at 11:14 AM, vinay patil <[hidden email]> wrote: Hi Robert, |
Yes I had configured it as given in the documentation. I can see this line in Job Manager Logs : Started JMX server on port 9020 (but this was on EMR ) How to do this locally ? can we check these metrics while running the pipeline from IDE ? If yes what is teh default JMX port to connect ? or do we need to do some configuration locally ? Regards, Vinay Patil On Mon, Dec 5, 2016 at 4:14 PM, rmetzger0 [via Apache Flink User Mailing List archive.] <[hidden email]> wrote:
|
For enabling JMX when starting Flink from your IDE, you need to do the following: configuration.setString("metrics.reporters", "my_jmx_reporter"); configuration.setString("metrics.reporter.my_jmx_reporter.class", "org.apache.flink.metrics.jmx.JMXReporter"); configuration.setString("metrics.reporter.my_jmx_reporter.port", "9020-9040"); StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironment(1, configuration); On Mon, Dec 5, 2016 at 11:56 AM, vinay patil <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |