Map Of DataStream getting NullPointer Exception

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Map Of DataStream getting NullPointer Exception

anuj.aj07

I am trying below piece of code to create multiple datastreams object and store in map.

for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }

        DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map value at :

DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);

As per my understanding, this is due to the map is local to main and not broadcast to tasks. 
If I want to do this how should I do, please help me to resolve this?



--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Map Of DataStream getting NullPointer Exception

r_khachatryan
As I understand from code, streamMap is a Java map, not Scala. So you can get NPE while unreferencing the value you got from it. 

Also, the approach looks a bit strange.
Can you describe what are you trying to achieve?

Regards,
Roman


On Mon, Feb 24, 2020 at 5:47 PM aj <[hidden email]> wrote:

I am trying below piece of code to create multiple datastreams object and store in map.

for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }

        DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map value at :

DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);

As per my understanding, this is due to the map is local to main and not broadcast to tasks. 
If I want to do this how should I do, please help me to resolve this?



--
Thanks & Regards,
Anuj Jain



Reply | Threaded
Open this post in threaded view
|

Re: Map Of DataStream getting NullPointer Exception

anuj.aj07

Hi Khachatryan, 

This is the use case to create multiple streams:

I have a use case where multiple types of Avro records are coming in single Kafka topic as we are suing TopicRecordNameStrategy for the subject in the schema registry. Now I have written a consumer to read that topic and build a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in parquet format as this stream contains different types of schema records. So I am filtering different records for each type by applying a filter and creating different streams and then sinking each stream separately.

So can you please help me create multiple dynamic streams with the code that I shared. How to resolve this issue?


On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <[hidden email]> wrote:
As I understand from code, streamMap is a Java map, not Scala. So you can get NPE while unreferencing the value you got from it. 

Also, the approach looks a bit strange.
Can you describe what are you trying to achieve?

Regards,
Roman


On Mon, Feb 24, 2020 at 5:47 PM aj <[hidden email]> wrote:

I am trying below piece of code to create multiple datastreams object and store in map.

for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }

        DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map value at :

DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);

As per my understanding, this is due to the map is local to main and not broadcast to tasks. 
If I want to do this how should I do, please help me to resolve this?



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Map Of DataStream getting NullPointer Exception

Arvid Heise-3
Hi Anuj,

is that piece of code in your first mail in the same main? Then at this point, nothing has been executed in Flink.

So we are looking at a normal Java programming error that you can easily debug and unit test.
Most likely, there is no event config for SEARCH_LIST_KEYLESS or there is a spelling error.

On Thu, Feb 27, 2020 at 9:23 AM aj <[hidden email]> wrote:

Hi Khachatryan, 

This is the use case to create multiple streams:

I have a use case where multiple types of Avro records are coming in single Kafka topic as we are suing TopicRecordNameStrategy for the subject in the schema registry. Now I have written a consumer to read that topic and build a Datastream of GenericRecord. Now I can not sink this stream to hdfs/s3 in parquet format as this stream contains different types of schema records. So I am filtering different records for each type by applying a filter and creating different streams and then sinking each stream separately.

So can you please help me create multiple dynamic streams with the code that I shared. How to resolve this issue?


On Tue, Feb 25, 2020 at 10:46 PM Khachatryan Roman <[hidden email]> wrote:
As I understand from code, streamMap is a Java map, not Scala. So you can get NPE while unreferencing the value you got from it. 

Also, the approach looks a bit strange.
Can you describe what are you trying to achieve?

Regards,
Roman


On Mon, Feb 24, 2020 at 5:47 PM aj <[hidden email]> wrote:

I am trying below piece of code to create multiple datastreams object and store in map.

for (EventConfig eventConfig : eventTypesList) {
            LOGGER.info("creating a stream for ", eventConfig.getEvent_name());
            String key = eventConfig.getEvent_name();
            final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject())))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

            DataStream<GenericRecord> stream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
                if (genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
                    return true;
                }
                return false;
            });

            Tuple2<DataStream<GenericRecord>, StreamingFileSink<GenericRecord>> tuple2 = new Tuple2<>(stream, sink);
            streamMap.put(key, tuple2);
        }

        DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);
        searchStream.map(new Enricher()).addSink(streamMap.get(SEARCH_LIST_KEYLESS).getField(1));


I am getting Nullpointer Exception when trying to get the stream from map value at :

DataStream<GenericRecord> searchStream = streamMap.get(SEARCH_LIST_KEYLESS).getField(0);

As per my understanding, this is due to the map is local to main and not broadcast to tasks. 
If I want to do this how should I do, please help me to resolve this?



--
Thanks & Regards,
Anuj Jain





--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07