I am trying below piece of code to create multiple datastreams object and store in map. for (EventConfig eventConfig : eventTypesList) { LOGGER.info("creating a stream for ", eventConfig.getEvent_name()); String key = eventConfig.getEvent_name(); final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject()))) .withBucketAssigner(new EventTimeBucketAssigner()) .build(); DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) { return true; } return false; }); Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink); streamMap.put(key, tuple2); } DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0); searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1)); I am getting Nullpointer Exception when trying to get the stream from map value at : DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0); As per my understanding, this is due to the map is local to main and not broadcast to tasks. If I want to do this how should I do, please help me to resolve this? |
As I understand from code, streamMap is a Java map, not Scala. So you can get NPE while unreferencing the value you got from it. Also, the approach looks a bit strange. Can you describe what are you trying to achieve? Regards,
Roman On Mon, Feb 24, 2020 at 5:47 PM aj <[hidden email]> wrote:
|
Hi Khachatryan, This is the use case to create multiple streams: I have a use case where multiple types of Avro records are coming in single Kafka topic as we are suing TopicRecordNameStrategy for the subject in the schema registry. Now I have written a consumer to read that topic and build a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in parquet format as this stream contains different types of schema records. So I am filtering different records for each type by applying a filter and creating different streams and then sinking each stream separately. So can you please help me create multiple dynamic streams with the code that I shared. How to resolve this issue? On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <[hidden email]> wrote:
|
Hi Anuj, is that piece of code in your first mail in the same main? Then at this point, nothing has been executed in Flink. So we are looking at a normal Java programming error that you can easily debug and unit test. Most likely, there is no event config for SEARCH_LIST_KEYLESS or there is a spelling error. On Thu, Feb 27, 2020 at 9:23 AM aj <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |