How to write stream data to other Hadoop Cluster by StreamingFileSink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to write stream data to other Hadoop Cluster by StreamingFileSink

zhangjun

Hi,all:

I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them configured the HA,
I have a job ,read from streaming data from kafka, and write it to hdfs by StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I want to write the data to mycluster2 , how did I add the configure ? If I write hdfs://mycluster2/tmp/abc   on the path of the StreamingFileSink directly, it will report that mycluster2 could not be found.


I look at the source code of org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create. When flink loads core-site.xml and hdfs-site.xml, it is first loaded from hadoopConfig, then flinkConfig, and finally from classpath. I see flinkConfig does not seem to be empty, and the code is loaded by flinkConfig, finally loaded from HADOOP_HOME, so the core-site.xml and hdfs-site.xml of mycluster1 cluster will not contain the  information of mycluster2. Cause mycluster2 not found.


thanks
Reply | Threaded
Open this post in threaded view
|

Re: How to write stream data to other Hadoop Cluster by StreamingFileSink

Yang Wang
Hi Jun Zhang,

I think you could add two hdfs cluster configurations in your hdfs-site.xml.
The following config keys need to be added. Then you could use both two hdfs clusters in your flink job.
dfs.nameservices: mycluster1,mycluster2

dfs.ha.namenodes.mycluster1: nn1,nn2
dfs.client.failover.proxy.provider.mycluster1: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
dfs.namenode.rpc-address.mycluster1.nn1: nn1-address
dfs.namenode.rpc-address.mycluster1.nn2: nn1-address

dfs.ha.namenodes.mycluster2: nn1,nn2
dfs.client.failover.proxy.provider.mycluster2: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
dfs.namenode.rpc-address.mycluster2.nn1: nn1-address
dfs.namenode.rpc-address.mycluster2.nn2: nn1-address


Best,
Yang

Jun Zhang <[hidden email]> 于2019年10月5日周六 下午1:45写道:

Hi,all:

I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them configured the HA,
I have a job ,read from streaming data from kafka, and write it to hdfs by StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I want to write the data to mycluster2 , how did I add the configure ? If I write hdfs://mycluster2/tmp/abc   on the path of the StreamingFileSink directly, it will report that mycluster2 could not be found.


I look at the source code of org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create. When flink loads core-site.xml and hdfs-site.xml, it is first loaded from hadoopConfig, then flinkConfig, and finally from classpath. I see flinkConfig does not seem to be empty, and the code is loaded by flinkConfig, finally loaded from HADOOP_HOME, so the core-site.xml and hdfs-site.xml of mycluster1 cluster will not contain the  information of mycluster2. Cause mycluster2 not found.


thanks
Reply | Threaded
Open this post in threaded view
|

Re: How to write stream data to other Hadoop Cluster by StreamingFileSink

zhangjun
In reply to this post by zhangjun


Hi,Yang :
thank you very much for your reply.

I had add the configurations on my hadoop cluster client , both hdfs-site.xml and core-site.xml are configured, the client can read mycluster1 and mycluter2, but when I submit the flink job to yarn cluster , the hadoop client configurations is invalid, I read the source code ,it will give priority to the configuration of the hadoop cluster.


On 10/9/2019 10:57[hidden email] wrote:
Hi Jun Zhang,

I think you could add two hdfs cluster configurations in your hdfs-site.xml.
The following config keys need to be added. Then you could use both two hdfs clusters in your flink job.
dfs.nameservices: mycluster1,mycluster2

dfs.ha.namenodes.mycluster1: nn1,nn2
dfs.client.failover.proxy.provider.mycluster1: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
dfs.namenode.rpc-address.mycluster1.nn1: nn1-address
dfs.namenode.rpc-address.mycluster1.nn2: nn1-address

dfs.ha.namenodes.mycluster2: nn1,nn2
dfs.client.failover.proxy.provider.mycluster2: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
dfs.namenode.rpc-address.mycluster2.nn1: nn1-address
dfs.namenode.rpc-address.mycluster2.nn2: nn1-address


Best,
Yang

Jun Zhang <[hidden email]> 于2019年10月5日周六 下午1:45写道:

Hi,all:

I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them configured the HA,
I have a job ,read from streaming data from kafka, and write it to hdfs by StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I want to write the data to mycluster2 , how did I add the configure ? If I write hdfs://mycluster2/tmp/abc   on the path of the StreamingFileSink directly, it will report that mycluster2 could not be found.


I look at the source code of org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create. When flink loads core-site.xml and hdfs-site.xml, it is first loaded from hadoopConfig, then flinkConfig, and finally from classpath. I see flinkConfig does not seem to be empty, and the code is loaded by flinkConfig, finally loaded from HADOOP_HOME, so the core-site.xml and hdfs-site.xml of mycluster1 cluster will not contain the  information of mycluster2. Cause mycluster2 not found.


thanks