Re: [Flink-9407] Question about proposed ORC Sink !

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [Flink-9407] Question about proposed ORC Sink !

zhangminglei
Hi, Sagar.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609.

Cheers
Zhangminglei


在 2018年6月23日,上午8:23,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,

Thanks for the reply.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Thanks,
Sagar

On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress   to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed.

Attached is the ending on my end, and you will see what you want! 

Please let me know if you still have the problem.

Cheers
Zhangminglei

setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink
.setWriter(new OrcFileWriter<>(orcSchemaString))
.setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}
<filestatus.jpg>



在 2018年6月22日,上午11:14,sagar loke <[hidden email]> 写道:

Sure, we can solve it together :)

Are you able to reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[hidden email]> wrote:
Sagar, flush will be called when do a checkpoint. Please see 

bucketState.currentFileValidLength = bucketState.writer.flush();


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

restoredBucketStates.clear();

synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();

if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);

if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}


在 2018年6月22日,上午10:21,sagar loke <[hidden email]> 写道:

Thanks for replying. 

Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 

env.enablecheckpointing(100);

But in all the cases, I still see .pending state. 

Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[hidden email]> wrote:
Hi,Sagar

Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint is successful the currently pending file will be removed to {@code finished}. 
Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error.

 env.enableCheckpointing(200);

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.

Cheers
Zhangminglei



在 2018年6月22日,上午4:46,sagar loke <[hidden email]> 写道:

Thanks Zhangminglei for quick response.

I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state.

Let me know if you can reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar

I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] 

I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it!

Cheers
Zhangminglei.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.types.Row;

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 3; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}


在 2018年6月21日,上午1:47,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,


I tried to use the code from PR and run it on local hdfs cluster to write some ORC data.

But somehow this code is failing with following error:

 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)

I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR.

I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error.

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {

private transient org.apache.orc.Writer orcWriter;
String schema;
TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
String basePath;

public FlinkOrcWriterV1(String schema) {
this.schema = schema;
this.typeDescriptionschema = TypeDescription.fromString(schema);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
Configuration conf = new Configuration();
orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
OrcFile.writerOptions(conf)
.setSchema(typeDescriptionschema));
}

@Override
public long flush() throws IOException {
return orcWriter.writeIntermediateFooter();
}

@Override
public long getPos() throws IOException {
return orcWriter.getRawDataSize();
}

@Override
public void close() throws IOException {
orcWriter.close();
}

@Override
public void write(T element) throws IOException {
VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}

@Override
public FlinkOrcWriterV1<T> duplicate() {
return new FlinkOrcWriterV1<>(schema);
}
}



Not sure, if the error is related to any of the hadoop dependencies or something else ?

Can you please look into it and let me know if you can reproduce it on your end too ?

By the way, following are my dependencies in my project: 

dependencies {
    compile 'org.apache.flink:flink-java:1.4.2'
    compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
    compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
    compile 'io.confluent:kafka-avro-serializer:3.3.0'
    compile 'org.apache.flink:flink-avro:1.4.2'
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
    compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2'
    compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2'
    compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0'
    compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0'
    compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


--
Thanks,
Sagar.




--
Regards,
SAGAR.

--
Cheers,
Sagar

--
Cheers,
Sagar




--
Regards,
SAGAR.

Reply | Threaded
Open this post in threaded view
|

Re: [Flink-9407] Question about proposed ORC Sink !

Ted Yu
For #1, the word exist should be exit, right ?
Thanks

-------- Original message --------
From: zhangminglei <[hidden email]>
Date: 6/23/18 10:12 AM (GMT+08:00)
To: sagar loke <[hidden email]>
Cc: dev <[hidden email]>, user <[hidden email]>
Subject: Re: [Flink-9407] Question about proposed ORC Sink !

Hi, Sagar.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609.

Cheers
Zhangminglei


在 2018年6月23日,上午8:23,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,

Thanks for the reply.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Thanks,
Sagar

On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress   to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed.

Attached is the ending on my end, and you will see what you want! 

Please let me know if you still have the problem.

Cheers
Zhangminglei

setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink
.setWriter(new OrcFileWriter<>(orcSchemaString))
.setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}
<filestatus.jpg>



在 2018年6月22日,上午11:14,sagar loke <[hidden email]> 写道:

Sure, we can solve it together :)

Are you able to reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[hidden email]> wrote:
Sagar, flush will be called when do a checkpoint. Please see 

bucketState.currentFileValidLength = bucketState.writer.flush();


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

restoredBucketStates.clear();

synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();

if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);

if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}


在 2018年6月22日,上午10:21,sagar loke <[hidden email]> 写道:

Thanks for replying. 

Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 

env.enablecheckpointing(100);

But in all the cases, I still see .pending state. 

Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[hidden email]> wrote:
Hi,Sagar

Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint is successful the currently pending file will be removed to {@code finished}. 
Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error.

 env.enableCheckpointing(200);

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.

Cheers
Zhangminglei



在 2018年6月22日,上午4:46,sagar loke <[hidden email]> 写道:

Thanks Zhangminglei for quick response.

I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state.

Let me know if you can reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar

I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] 

I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it!

Cheers
Zhangminglei.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.types.Row;

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 3; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}


在 2018年6月21日,上午1:47,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,


I tried to use the code from PR and run it on local hdfs cluster to write some ORC data.

But somehow this code is failing with following error:

 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)

I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR.

I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error.

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {

private transient org.apache.orc.Writer orcWriter;
String schema;
TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
String basePath;

public FlinkOrcWriterV1(String schema) {
this.schema = schema;
this.typeDescriptionschema = TypeDescription.fromString(schema);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
Configuration conf = new Configuration();
orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
OrcFile.writerOptions(conf)
.setSchema(typeDescriptionschema));
}

@Override
public long flush() throws IOException {
return orcWriter.writeIntermediateFooter();
}

@Override
public long getPos() throws IOException {
return orcWriter.getRawDataSize();
}

@Override
public void close() throws IOException {
orcWriter.close();
}

@Override
public void write(T element) throws IOException {
VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}

@Override
public FlinkOrcWriterV1<T> duplicate() {
return new FlinkOrcWriterV1<>(schema);
}
}



Not sure, if the error is related to any of the hadoop dependencies or something else ?

Can you please look into it and let me know if you can reproduce it on your end too ?

By the way, following are my dependencies in my project: 

dependencies {
    compile 'org.apache.flink:flink-java:1.4.2'
    compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
    compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
    compile 'io.confluent:kafka-avro-serializer:3.3.0'
    compile 'org.apache.flink:flink-avro:1.4.2'
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
    compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2'
    compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2'
    compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0'
    compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0'
    compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


--
Thanks,
Sagar.




--
Regards,
SAGAR.

--
Cheers,
Sagar

--
Cheers,
Sagar




--
Regards,
SAGAR.

Reply | Threaded
Open this post in threaded view
|

Re: [Flink-9407] Question about proposed ORC Sink !

zhangminglei
Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

在 2018年6月23日,下午12:40,Ted Yu <[hidden email]> 写道:

For #1, the word exist should be exit, right ?
Thanks

-------- Original message --------
From: zhangminglei <[hidden email]>
Date: 6/23/18 10:12 AM (GMT+08:00)
To: sagar loke <[hidden email]>
Cc: dev <[hidden email]>, user <[hidden email]>
Subject: Re: [Flink-9407] Question about proposed ORC Sink !

Hi, Sagar.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609.

Cheers
Zhangminglei


在 2018年6月23日,上午8:23,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,

Thanks for the reply.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Thanks,
Sagar

On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress   to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed.

Attached is the ending on my end, and you will see what you want! 

Please let me know if you still have the problem.

Cheers
Zhangminglei

setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink
.setWriter(new OrcFileWriter<>(orcSchemaString))
.setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}
<filestatus.jpg>



在 2018年6月22日,上午11:14,sagar loke <[hidden email]> 写道:

Sure, we can solve it together :)

Are you able to reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[hidden email]> wrote:
Sagar, flush will be called when do a checkpoint. Please see 

bucketState.currentFileValidLength = bucketState.writer.flush();


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

restoredBucketStates.clear();

synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();

if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);

if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}


在 2018年6月22日,上午10:21,sagar loke <[hidden email]> 写道:

Thanks for replying. 

Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 

env.enablecheckpointing(100);

But in all the cases, I still see .pending state. 

Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[hidden email]> wrote:
Hi,Sagar

Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint is successful the currently pending file will be removed to {@code finished}. 
Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error.

 env.enableCheckpointing(200);

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.

Cheers
Zhangminglei



在 2018年6月22日,上午4:46,sagar loke <[hidden email]> 写道:

Thanks Zhangminglei for quick response.

I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state.

Let me know if you can reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar

I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] 

I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it!

Cheers
Zhangminglei.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.types.Row;

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 3; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}


在 2018年6月21日,上午1:47,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,


I tried to use the code from PR and run it on local hdfs cluster to write some ORC data.

But somehow this code is failing with following error:

 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)

I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR.

I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error.

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {

private transient org.apache.orc.Writer orcWriter;
String schema;
TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
String basePath;

public FlinkOrcWriterV1(String schema) {
this.schema = schema;
this.typeDescriptionschema = TypeDescription.fromString(schema);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
Configuration conf = new Configuration();
orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
OrcFile.writerOptions(conf)
.setSchema(typeDescriptionschema));
}

@Override
public long flush() throws IOException {
return orcWriter.writeIntermediateFooter();
}

@Override
public long getPos() throws IOException {
return orcWriter.getRawDataSize();
}

@Override
public void close() throws IOException {
orcWriter.close();
}

@Override
public void write(T element) throws IOException {
VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}

@Override
public FlinkOrcWriterV1<T> duplicate() {
return new FlinkOrcWriterV1<>(schema);
}
}



Not sure, if the error is related to any of the hadoop dependencies or something else ?

Can you please look into it and let me know if you can reproduce it on your end too ?

By the way, following are my dependencies in my project: 

dependencies {
    compile 'org.apache.flink:flink-java:1.4.2'
    compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
    compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
    compile 'io.confluent:kafka-avro-serializer:3.3.0'
    compile 'org.apache.flink:flink-avro:1.4.2'
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
    compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2'
    compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2'
    compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0'
    compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0'
    compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


--
Thanks,
Sagar.




--
Regards,
SAGAR.

--
Cheers,
Sagar

--
Cheers,
Sagar




--
Regards,
SAGAR.


Reply | Threaded
Open this post in threaded view
|

Re: [Flink-9407] Question about proposed ORC Sink !

sagar loke
@zhangminglei,

Question about the schema for ORC format:

1. Does it always need to be of complex type "<Struct>" ?

2. Or can it be created with individual data types directly ? 
    eg. "name:string, age:int" ? 


Thanks,
Sagar

On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <[hidden email]> wrote:
Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

在 2018年6月23日,下午12:40,Ted Yu <[hidden email]> 写道:

For #1, the word exist should be exit, right ?
Thanks

-------- Original message --------
From: zhangminglei <[hidden email]>
Date: 6/23/18 10:12 AM (GMT+08:00)
To: sagar loke <[hidden email]>
Cc: dev <[hidden email]>, user <[hidden email]>
Subject: Re: [Flink-9407] Question about proposed ORC Sink !

Hi, Sagar.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609.

Cheers
Zhangminglei


在 2018年6月23日,上午8:23,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,

Thanks for the reply.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Thanks,
Sagar

On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress   to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed.

Attached is the ending on my end, and you will see what you want! 

Please let me know if you still have the problem.

Cheers
Zhangminglei

setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink
.setWriter(new OrcFileWriter<>(orcSchemaString))
.setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}
<filestatus.jpg>



在 2018年6月22日,上午11:14,sagar loke <[hidden email]> 写道:

Sure, we can solve it together :)

Are you able to reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[hidden email]> wrote:
Sagar, flush will be called when do a checkpoint. Please see 

bucketState.currentFileValidLength = bucketState.writer.flush();


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

restoredBucketStates.clear();

synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();

if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);

if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}


在 2018年6月22日,上午10:21,sagar loke <[hidden email]> 写道:

Thanks for replying. 

Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 

env.enablecheckpointing(100);

But in all the cases, I still see .pending state. 

Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[hidden email]> wrote:
Hi,Sagar

Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint is successful the currently pending file will be removed to {@code finished}. 
Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error.

 env.enableCheckpointing(200);

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.

Cheers
Zhangminglei



在 2018年6月22日,上午4:46,sagar loke <[hidden email]> 写道:

Thanks Zhangminglei for quick response.

I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state.

Let me know if you can reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar

I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] 

I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it!

Cheers
Zhangminglei.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.types.Row;

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 3; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}


在 2018年6月21日,上午1:47,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,


I tried to use the code from PR and run it on local hdfs cluster to write some ORC data.

But somehow this code is failing with following error:

 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)

I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR.

I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error.

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {

private transient org.apache.orc.Writer orcWriter;
String schema;
TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
String basePath;

public FlinkOrcWriterV1(String schema) {
this.schema = schema;
this.typeDescriptionschema = TypeDescription.fromString(schema);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
Configuration conf = new Configuration();
orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
OrcFile.writerOptions(conf)
.setSchema(typeDescriptionschema));
}

@Override
public long flush() throws IOException {
return orcWriter.writeIntermediateFooter();
}

@Override
public long getPos() throws IOException {
return orcWriter.getRawDataSize();
}

@Override
public void close() throws IOException {
orcWriter.close();
}

@Override
public void write(T element) throws IOException {
VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}

@Override
public FlinkOrcWriterV1<T> duplicate() {
return new FlinkOrcWriterV1<>(schema);
}
}



Not sure, if the error is related to any of the hadoop dependencies or something else ?

Can you please look into it and let me know if you can reproduce it on your end too ?

By the way, following are my dependencies in my project: 

dependencies {
    compile 'org.apache.flink:flink-java:1.4.2'
    compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
    compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
    compile 'io.confluent:kafka-avro-serializer:3.3.0'
    compile 'org.apache.flink:flink-avro:1.4.2'
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
    compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2'
    compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2'
    compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0'
    compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0'
    compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


--
Thanks,
Sagar.




--
Regards,
SAGAR.

--
Cheers,
Sagar

--
Cheers,
Sagar




--
Regards,
SAGAR.





--
Regards,
SAGAR.
Reply | Threaded
Open this post in threaded view
|

Re: [Flink-9407] Question about proposed ORC Sink !

Fabian Hueske-2
Hi Sagar,

That's more a question for the ORC community, but AFAIK, the top-level type is always a struct because it needs to wrap the fields, e.g., struct(name:string, age:int)

Best, Fabian

2018-06-26 22:38 GMT+02:00 sagar loke <[hidden email]>:
@zhangminglei,

Question about the schema for ORC format:

1. Does it always need to be of complex type "<Struct>" ?

2. Or can it be created with individual data types directly ? 
    eg. "name:string, age:int" ? 


Thanks,
Sagar

On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <[hidden email]> wrote:
Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

在 2018年6月23日,下午12:40,Ted Yu <[hidden email]> 写道:

For #1, the word exist should be exit, right ?
Thanks

-------- Original message --------
From: zhangminglei <[hidden email]>
Date: 6/23/18 10:12 AM (GMT+08:00)
To: sagar loke <[hidden email]>
Cc: dev <[hidden email]>, user <[hidden email]>
Subject: Re: [Flink-9407] Question about proposed ORC Sink !

Hi, Sagar.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609.

Cheers
Zhangminglei


在 2018年6月23日,上午8:23,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,

Thanks for the reply.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Thanks,
Sagar

On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress   to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed.

Attached is the ending on my end, and you will see what you want! 

Please let me know if you still have the problem.

Cheers
Zhangminglei

setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink
.setWriter(new OrcFileWriter<>(orcSchemaString))
.setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}
<filestatus.jpg>



在 2018年6月22日,上午11:14,sagar loke <[hidden email]> 写道:

Sure, we can solve it together :)

Are you able to reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[hidden email]> wrote:
Sagar, flush will be called when do a checkpoint. Please see 

bucketState.currentFileValidLength = bucketState.writer.flush();


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

restoredBucketStates.clear();

synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();

if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);

if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}


在 2018年6月22日,上午10:21,sagar loke <[hidden email]> 写道:

Thanks for replying. 

Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 

env.enablecheckpointing(100);

But in all the cases, I still see .pending state. 

Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[hidden email]> wrote:
Hi,Sagar

Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint is successful the currently pending file will be removed to {@code finished}. 
Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error.

 env.enableCheckpointing(200);

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.

Cheers
Zhangminglei



在 2018年6月22日,上午4:46,sagar loke <[hidden email]> 写道:

Thanks Zhangminglei for quick response.

I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state.

Let me know if you can reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar

I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] 

I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it!

Cheers
Zhangminglei.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.types.Row;

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 3; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}


在 2018年6月21日,上午1:47,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,


I tried to use the code from PR and run it on local hdfs cluster to write some ORC data.

But somehow this code is failing with following error:

 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)

I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR.

I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error.

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {

private transient org.apache.orc.Writer orcWriter;
String schema;
TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
String basePath;

public FlinkOrcWriterV1(String schema) {
this.schema = schema;
this.typeDescriptionschema = TypeDescription.fromString(schema);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
Configuration conf = new Configuration();
orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
OrcFile.writerOptions(conf)
.setSchema(typeDescriptionschema));
}

@Override
public long flush() throws IOException {
return orcWriter.writeIntermediateFooter();
}

@Override
public long getPos() throws IOException {
return orcWriter.getRawDataSize();
}

@Override
public void close() throws IOException {
orcWriter.close();
}

@Override
public void write(T element) throws IOException {
VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}

@Override
public FlinkOrcWriterV1<T> duplicate() {
return new FlinkOrcWriterV1<>(schema);
}
}



Not sure, if the error is related to any of the hadoop dependencies or something else ?

Can you please look into it and let me know if you can reproduce it on your end too ?

By the way, following are my dependencies in my project: 

dependencies {
    compile 'org.apache.flink:flink-java:1.4.2'
    compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
    compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
    compile 'io.confluent:kafka-avro-serializer:3.3.0'
    compile 'org.apache.flink:flink-avro:1.4.2'
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
    compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2'
    compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2'
    compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0'
    compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0'
    compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


--
Thanks,
Sagar.




--
Regards,
SAGAR.

--
Cheers,
Sagar

--
Cheers,
Sagar




--
Regards,
SAGAR.





--
Regards,
SAGAR.

Reply | Threaded
Open this post in threaded view
|

Re: [Flink-9407] Question about proposed ORC Sink !

sagar loke
Thanks @zhangminglei and @Fabian for confirming.

Even I looked at the ORC parsing code and it seems that using <struct> type is mandatory for now. 

Thanks,
Sagar

On Wed, Jun 27, 2018 at 12:59 AM, Fabian Hueske <[hidden email]> wrote:
Hi Sagar,

That's more a question for the ORC community, but AFAIK, the top-level type is always a struct because it needs to wrap the fields, e.g., struct(name:string, age:int)

Best, Fabian

2018-06-26 22:38 GMT+02:00 sagar loke <[hidden email]>:
@zhangminglei,

Question about the schema for ORC format:

1. Does it always need to be of complex type "<Struct>" ?

2. Or can it be created with individual data types directly ? 
    eg. "name:string, age:int" ? 


Thanks,
Sagar

On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <[hidden email]> wrote:
Yes, it should be exit. Thanks to Ted Yu. Very exactly! 

Cheers
Zhangminglei

在 2018年6月23日,下午12:40,Ted Yu <[hidden email]> 写道:

For #1, the word exist should be exit, right ?
Thanks

-------- Original message --------
From: zhangminglei <[hidden email]>
Date: 6/23/18 10:12 AM (GMT+08:00)
To: sagar loke <[hidden email]>
Cc: dev <[hidden email]>, user <[hidden email]>
Subject: Re: [Flink-9407] Question about proposed ORC Sink !

Hi, Sagar.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

Ans: 

Yea, Make the program exists and in that time if a checkpoint does not finished will lead the status keeps in .pending state then. Under the normal circumstances, the programs that running in the production env will never be stoped or existed if everything is fine.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Ans: 

Yes. Writer should work with default settings correct.
Yes. We do not have to explicitly set these parameters to make it work.
Yes. Assumption correct indeed.

However, you know, flink is a real time streaming framework, so under normal circumstances,you don't really go to use the default settings when it comes to a specific business. Especially together work with offline end(Like hadoop mapreduce). In this case, you need to tell the offline end when time a bucket is close and when time the data for the specify bucket is ready. So, you can take a look on https://issues.apache.org/jira/browse/FLINK-9609.

Cheers
Zhangminglei


在 2018年6月23日,上午8:23,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,

Thanks for the reply.

1. It solves the issue partially meaning files which have finished checkpointing don't show .pending status but the files which were in progress 
    when the program exists are still in .pending state.

2. Ideally, writer should work with default settings correct ? Meaning we don't have to explicitly set these parameters to make it work. 
    Is this assumption correct ?

Thanks,
Sagar

On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar. Please use the below code and you will find the part files status from _part-0-107.in-progress   to _part-0-107.pending and finally to part-0-107. [For example], you need to run the program for a while. However, we need set some parameters, like the following. Moreover, enableCheckpointing IS also needed. I know why you always see the .pending file since the below parameters default value is 60 seconds even though you set the enableCheckpoint. So, that is why you can not see the finished file status until 60 seconds passed.

Attached is the ending on my end, and you will see what you want! 

Please let me know if you still have the problem.

Cheers
Zhangminglei

setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
env.setStateBackend(new MemoryStateBackend());

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink
.setWriter(new OrcFileWriter<>(orcSchemaString))
.setInactiveBucketCheckInterval(2000)
.setInactiveBucketThreshold(2000);

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 2147483000; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}
<filestatus.jpg>



在 2018年6月22日,上午11:14,sagar loke <[hidden email]> 写道:

Sure, we can solve it together :)

Are you able to reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <[hidden email]> wrote:
Sagar, flush will be called when do a checkpoint. Please see 

bucketState.currentFileValidLength = bucketState.writer.flush();


@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");

restoredBucketStates.clear();

synchronized (state.bucketStates) {
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
BucketState<T> bucketState = bucketStateEntry.getValue();

if (bucketState.isWriterOpen) {
bucketState.currentFileValidLength = bucketState.writer.flush();
}

synchronized (bucketState.pendingFilesPerCheckpoint) {
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
restoredBucketStates.add(state);

if (LOG.isDebugEnabled()) {
LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
}
}


在 2018年6月22日,上午10:21,sagar loke <[hidden email]> 写道:

Thanks for replying. 

Yes, I tried with different values of checkpoint eg. 20, 100, 5000. 

env.enablecheckpointing(100);

But in all the cases, I still see .pending state. 

Not sure if it’s related to flush() method from OrcFileWriter ? Which might not be getting called somehow ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <[hidden email]> wrote:
Hi,Sagar

Please take a look at BucketingSink, It says that a file would keep .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint is successful the currently pending file will be removed to {@code finished}. 
Take a try again. I think you should call the below method and see what would happen on it. Anyway, I will also try that and see whether it works. Please let me know if you still meet error.

 env.enableCheckpointing(200);

/**
* The suffix for {@code pending} part files. These are closed files that we are
* not currently writing to (inactive or reached {@link #batchSize}), but which
* were not yet confirmed by a checkpoint.
*/
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
<p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
* The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
* semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
* a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
* pending files will be moved to {@code finished}.

Cheers
Zhangminglei



在 2018年6月22日,上午4:46,sagar loke <[hidden email]> 写道:

Thanks Zhangminglei for quick response.

I tried the above code and I am seeing another issue where the files created on hdfs are always in .pending state.

Let me know if you can reproduce it ?

Thanks,
Sagar

On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <[hidden email]> wrote:
Hi, Sagar

I did a local test for that and it seems works fine for me. PR will be updated for [FLINK-9407] 

I will update the newest code to PR soon and below is the example I was using for my test. You can check it again. Hopes you can enjoy it!

Cheers
Zhangminglei.
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.types.Row;

public class TestOrc {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

String orcSchemaString = "struct<name:string,age:int,married:boolean>";
String path = "hdfs://10.199.196.0:9000/data/hive/man";

BucketingSink<Row> bucketingSink = new BucketingSink<>(path);

bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));

DataStream<Row> dataStream = env.addSource(new ManGenerator());

dataStream.addSink(bucketingSink);

env.execute();
}

public static class ManGenerator implements SourceFunction<Row> {

@Override
public void run(SourceContext<Row> ctx) throws Exception {
for (int i = 0; i < 3; i++) {
Row row = new Row(3);
row.setField(0, "Sagar");
row.setField(1, 26 + i);
row.setField(2, false);
ctx.collect(row);
}
}

@Override
public void cancel() {

}
}
}


在 2018年6月21日,上午1:47,sagar loke <[hidden email]> 写道:

Hi Zhangminglei,


I tried to use the code from PR and run it on local hdfs cluster to write some ORC data.

But somehow this code is failing with following error:

 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36 on 127.0.0.1
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2500)

I understand that this error is related to Hadoop but somehow I get this error only when executing the code from this PR.

I had created very crude way to write ORC file to HDFS as per follows. Below code works alright and does not throw above error.

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {

private transient org.apache.orc.Writer orcWriter;
String schema;
TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
String basePath;

public FlinkOrcWriterV1(String schema) {
this.schema = schema;
this.typeDescriptionschema = TypeDescription.fromString(schema);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
Configuration conf = new Configuration();
orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
OrcFile.writerOptions(conf)
.setSchema(typeDescriptionschema));
}

@Override
public long flush() throws IOException {
return orcWriter.writeIntermediateFooter();
}

@Override
public long getPos() throws IOException {
return orcWriter.getRawDataSize();
}

@Override
public void close() throws IOException {
orcWriter.close();
}

@Override
public void write(T element) throws IOException {
VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
orcWriter.addRowBatch(batch);
batch.reset();
}
}

@Override
public FlinkOrcWriterV1<T> duplicate() {
return new FlinkOrcWriterV1<>(schema);
}
}



Not sure, if the error is related to any of the hadoop dependencies or something else ?

Can you please look into it and let me know if you can reproduce it on your end too ?

By the way, following are my dependencies in my project: 

dependencies {
    compile 'org.apache.flink:flink-java:1.4.2'
    compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
    compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2'
    compile 'org.apache.flink:flink-connector-elasticsearch5_2.11:1.4.2'
    compile 'io.confluent:kafka-avro-serializer:3.3.0'
    compile 'org.apache.flink:flink-avro:1.4.2'
    compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
    compile group: 'org.apache.flink', name: 'flink-shaded-hadoop2', version: '1.4.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.11:1.4.2'
    compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.4.2'
    compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.4.2'
    compile group: 'org.apache.orc', name: 'orc-core', version: '1.5.1'
    compile group: 'org.apache.parquet', name: 'parquet-avro', version: '1.10.0'
    compile group: 'org.apache.parquet', name: 'parquet-common', version: '1.10.0'
    compile group: 'org.apache.flink', name: 'flink-orc_2.11', version: '1.4.2'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}


--
Thanks,
Sagar.




--
Regards,
SAGAR.

--
Cheers,
Sagar

--
Cheers,
Sagar




--
Regards,
SAGAR.





--
Regards,
SAGAR.




--
Regards,
SAGAR.