Hi Alex,
Also not fully sure what the problem is.
I guess it is that you expect kafkaProducer to be created in the main() method and then be used in filter function but it turns out to be null.
If this is the case then I think it is a serialisation problem. Not sure, how it works in case var fields in scala but
it probably has not been properly serialised/deserialised.
The problem is that you create your kafkaProducer in main() of your job submitting client (one JVM process)
but the actual computation happens in another JVM process in the cluster so your filter function and all its dependencies (also kafkaProducer implicitly)
have to go through serialisation/deserialisation to be transferred to another machine and this does not work.
It works locally because it is always the same JVM process.
The general recommendation is to create this kind of heavy objects e.g. in open() method of filter function so that it happens already inside the cluster JVM process.
Only simple POJO objects are usually good fit for going through serialisation/deserialisation.
Another option is the side output as Arvid already mentioned using a Kafka connector [1].
Best,
Andrey
Hi Alex,
I'm not entirely sure what's happening; the snippets are not conclusive.
However, I'd recommend to checkout side-outputs [1] as a way to output misformed data.
Best,
Arvid
Hi all.
But I uses kafkaProducer at dataStream filter. It is null. Could anyone can explain this ? I debuted at local. It is normal. This exception occurred at yarn cluster. thanks.
Alex Fu.