Flink Stream job to parquet sink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Stream job to parquet sink

anuj.aj07
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

anuj.aj07
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

anuj.aj07
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

Arvid Heise-3
Hi Anuj,

There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?)

So some ideas:
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
- Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.

The first option is the easiest and the last option the most versatile (could even have different sink types mixed).

On Tue, Jun 23, 2020 at 5:34 AM aj <[hidden email]> wrote:
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

Rafi Aroch
Hi Arvid,

Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder?

Thanks,
Rafi


On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <[hidden email]> wrote:
Hi Anuj,

There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?)

So some ideas:
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
- Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.

The first option is the easiest and the last option the most versatile (could even have different sink types mixed).

On Tue, Jun 23, 2020 at 5:34 AM aj <[hidden email]> wrote:
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

anuj.aj07
Thanks, Arvide for detailed answers. 
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
  Yes, that can be written but that not solve the problem completely because I want to avoid job restart itself. Every time I restart I also have to restart from the last checkpoint in S3. 

- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
   Again same reason as first also not sure how would it be implemented. 

Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.
-Please provide me some idea I want to implement this, how it can be possible. 

[hidden email]  jJuujIstdfdf 
I am just thinking can I use a broadcast state where this config rule which I keeping in YAML can be a push in Kafka itself.  Because I just need event name and  Avro schema subject mapping mainly. 
Please correct me if I am thinking in the wrong direction.



On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Arvid,

Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder?

Thanks,
Rafi


On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <[hidden email]> wrote:
Hi Anuj,

There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?)

So some ideas:
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
- Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.

The first option is the easiest and the last option the most versatile (could even have different sink types mixed).

On Tue, Jun 23, 2020 at 5:34 AM aj <[hidden email]> wrote:
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

Arvid Heise-3
Hi Anuj,

Yes, broadcast sounds really good. Now you just need to hide the structural invariance (variable number of sinks) by delegating to inner sinks.

public class SplittingStreamingFileSink<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
      Map<String, StreamingFileSink> sinks = ...; // event name to sink
      // delegate each method of RichSinkFunction, CheckpointedFunction, CheckpointListener to sinks

      public void invoke(IN value, SinkFunction.Context context) throws Exception {
StreamingFileSink sink = sinks.get(value.getEventName());
            if (sink == null) {
                  // create new StreamingFileSink add to sinks and call initializeState
            }
            sink.invoke(value, context);
      }
}
Use the SplittingStreamingFileSink as the only sink in your workflow. You definitely need to wrap the SinkFunction.Context such that state gets prefixes (eventName), but it should be rather straightforward.

On Thu, Jun 25, 2020 at 3:47 PM aj <[hidden email]> wrote:
Thanks, Arvide for detailed answers. 
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
  Yes, that can be written but that not solve the problem completely because I want to avoid job restart itself. Every time I restart I also have to restart from the last checkpoint in S3. 

- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
   Again same reason as first also not sure how would it be implemented. 

Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.
-Please provide me some idea I want to implement this, how it can be possible. 

[hidden email]  jJuujIstdfdf 
I am just thinking can I use a broadcast state where this config rule which I keeping in YAML can be a push in Kafka itself.  Because I just need event name and  Avro schema subject mapping mainly. 
Please correct me if I am thinking in the wrong direction.



On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Arvid,

Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder?

Thanks,
Rafi


On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <[hidden email]> wrote:
Hi Anuj,

There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?)

So some ideas:
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
- Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.

The first option is the easiest and the last option the most versatile (could even have different sink types mixed).

On Tue, Jun 23, 2020 at 5:34 AM aj <[hidden email]> wrote:
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream job to parquet sink

anuj.aj07
Thanks, Arvid. I will try to implement using the broadcast approach. 

On Fri, Jun 26, 2020 at 1:10 AM Arvid Heise <[hidden email]> wrote:
Hi Anuj,

Yes, broadcast sounds really good. Now you just need to hide the structural invariance (variable number of sinks) by delegating to inner sinks.

public class SplittingStreamingFileSink<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
      Map<String, StreamingFileSink> sinks = ...; // event name to sink
      // delegate each method of RichSinkFunction, CheckpointedFunction, CheckpointListener to sinks

      public void invoke(IN value, SinkFunction.Context context) throws Exception {
StreamingFileSink sink = sinks.get(value.getEventName());
            if (sink == null) {
                  // create new StreamingFileSink add to sinks and call initializeState
            }
            sink.invoke(value, context);
      }
}
Use the SplittingStreamingFileSink as the only sink in your workflow. You definitely need to wrap the SinkFunction.Context such that state gets prefixes (eventName), but it should be rather straightforward.

On Thu, Jun 25, 2020 at 3:47 PM aj <[hidden email]> wrote:
Thanks, Arvide for detailed answers. 
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
  Yes, that can be written but that not solve the problem completely because I want to avoid job restart itself. Every time I restart I also have to restart from the last checkpoint in S3. 

- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
   Again same reason as first also not sure how would it be implemented. 

Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.
-Please provide me some idea I want to implement this, how it can be possible. 

[hidden email]  jJuujIstdfdf 
I am just thinking can I use a broadcast state where this config rule which I keeping in YAML can be a push in Kafka itself.  Because I just need event name and  Avro schema subject mapping mainly. 
Please correct me if I am thinking in the wrong direction.



On Thu, Jun 25, 2020 at 2:18 PM Rafi Aroch <[hidden email]> wrote:
Hi Arvid,

Would it be possible to implement a BucketAssigner that for example loads the configuration periodically from an external source and according to the event type decides on a different sub-folder?

Thanks,
Rafi


On Wed, Jun 24, 2020 at 10:53 PM Arvid Heise <[hidden email]> wrote:
Hi Anuj,

There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?)

So some ideas:
- Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue).
- Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart.
- Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs.

The first option is the easiest and the last option the most versatile (could even have different sink types mixed).

On Tue, Jun 23, 2020 at 5:34 AM aj <[hidden email]> wrote:
I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <[hidden email]> wrote:
please help with this. Any suggestions.

On Sat, Jun 6, 2020 at 12:20 PM aj <[hidden email]> wrote:
Hello All,

I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. 
I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event.   
This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.


YAML config :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"

- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"

- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"


checkPointInterval: 1200000

topics: ["search_list_keyless","bike_search_details","analytics-keyless"]



Sink code :

  YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);

long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();

List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();

CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);


FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);

DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");

try {
for (EventConfig eventConfig : eventTypesList) {

LOG.info("creating a stream for ", eventConfig.getEvent_name());

final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);

}
} catch (Exception e) {
e.printStackTrace();
}



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07