Enable Multiple kafka Consumer sources for job

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Enable Multiple kafka Consumer sources for job

sudhansu069
Hi Team ,

We are trying to build a data pipeline where we have to set up two different kafka consumers for two different kafka topics and with a single SNS sink.
Below is the sample code for the same , but looks like from one of the sources the events are not flowing into the cluster. We are using the merge API for merging two input sources here.


DataStream<Tuple2<String, AuditEvent>> inputStream1 = env.addSource(flinkKafkaConsumer)
.uid(configParams.get(AppConstant.JOB_PUBLISHER_STATE_KAFKA_SOURCE_UUID))
.name(AppConstant.FHIR_SOURCE);

DataStream<Tuple2<String, AuditEvent>> inputStream2 = env.addSource(flinkKafkaConsumerFromRejectedTopic)
.uid("testUID")
.name(AppConstant.FHIR_SOURCE_FOR_REJECTED_QUEUE);

DataStream<Tuple2<String, AuditEvent>> allStreams = inputStream1.union(inputStream2);

In the above code snippet, allStreams is only pulling events from inputStream1 but expectation is allStreams should be pulling events from both inputStream1 and inputStream2. Could you please help us in understanding if this is the right approach or if we are missing something.

Thanks,
Sudhansu
Reply | Threaded
Open this post in threaded view
|

Re: Enable Multiple kafka Consumer sources for job

liujiangang
For debug, you can just pull data from inputStream2.

sudhansu069 [via Apache Flink User Mailing List archive.] <[hidden email]> 于2021年5月27日周四 下午11:22写道:
Hi Team ,

We are trying to build a data pipeline where we have to set up two different kafka consumers for two different kafka topics and with a single SNS sink.
Below is the sample code for the same , but looks like from one of the sources the events are not flowing into the cluster. We are using the merge API for merging two input sources here.


DataStream<Tuple2<String, AuditEvent>> inputStream1 = env.addSource(flinkKafkaConsumer)
.uid(configParams.get(AppConstant.JOB_PUBLISHER_STATE_KAFKA_SOURCE_UUID))
.name(AppConstant.FHIR_SOURCE);

DataStream<Tuple2<String, AuditEvent>> inputStream2 = env.addSource(flinkKafkaConsumerFromRejectedTopic)
.uid("testUID")
.name(AppConstant.FHIR_SOURCE_FOR_REJECTED_QUEUE);

DataStream<Tuple2<String, AuditEvent>> allStreams = inputStream1.union(inputStream2);

In the above code snippet, allStreams is only pulling events from inputStream1 but expectation is allStreams should be pulling events from both inputStream1 and inputStream2. Could you please help us in understanding if this is the right approach or if we are missing something.

Thanks,
Sudhansu



To start a new topic under Apache Flink User Mailing List archive., email [hidden email]
To unsubscribe from Apache Flink User Mailing List archive., click here.
NAML