Implementing a TarInputFormat based on FileInputFormat

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing a TarInputFormat based on FileInputFormat

Billy Bain
I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {

    private static final Logger logger = LoggerFactory.getLogger(TarInputFormat.class);
    private transient TarArchiveInputStream tarArchiveInputStream;
    private TarArchiveEntry nextEntry;
    private final Class<E> valueType;
    private long currentPosition = 0L;
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public TarInputFormat(Path filePath, Class<E> valueType) {
        super(filePath);
        this.valueType = valueType;
        this.unsplittable = true;
        this.setNumSplits(1);
    }

    @Override
    public TypeInformation<E> getProducedType() {
        return TypeExtractor.getForClass(this.valueType);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        tarArchiveInputStream = new TarArchiveInputStream(stream);
        nextEntry = tarArchiveInputStream.getNextTarEntry();
        logger.info("Entry Name={} size={}",nextEntry.getName(), nextEntry.getSize());
    }

    @Override
    public void close() throws IOException {
        super.close();
        if (tarArchiveInputStream != null) {
            tarArchiveInputStream.close();
        }
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return nextEntry == null ||  currentPosition == nextEntry.getSize();
    }

    @Override
    public E nextRecord(E reuse) throws IOException {
        if(reachedEnd()) {
            return null;
        }
        logger.info("currentPosition={}", currentPosition);
        int c;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while (currentPosition < nextEntry.getSize()) {
            c = tarArchiveInputStream.read();
            currentPosition++;
            if (c == '\n') {
                break;
            } else {
                bos.write(c);
            }
        }
        return objectMapper.readValue(bos.toByteArray(), valueType);
    }

}


Thanks.

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a TarInputFormat based on FileInputFormat

Arvid Heise-3
Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <[hidden email]> wrote:
I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {

    private static final Logger logger = LoggerFactory.getLogger(TarInputFormat.class);
    private transient TarArchiveInputStream tarArchiveInputStream;
    private TarArchiveEntry nextEntry;
    private final Class<E> valueType;
    private long currentPosition = 0L;
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public TarInputFormat(Path filePath, Class<E> valueType) {
        super(filePath);
        this.valueType = valueType;
        this.unsplittable = true;
        this.setNumSplits(1);
    }

    @Override
    public TypeInformation<E> getProducedType() {
        return TypeExtractor.getForClass(this.valueType);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        tarArchiveInputStream = new TarArchiveInputStream(stream);
        nextEntry = tarArchiveInputStream.getNextTarEntry();
        logger.info("Entry Name={} size={}",nextEntry.getName(), nextEntry.getSize());
    }

    @Override
    public void close() throws IOException {
        super.close();
        if (tarArchiveInputStream != null) {
            tarArchiveInputStream.close();
        }
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return nextEntry == null ||  currentPosition == nextEntry.getSize();
    }

    @Override
    public E nextRecord(E reuse) throws IOException {
        if(reachedEnd()) {
            return null;
        }
        logger.info("currentPosition={}", currentPosition);
        int c;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while (currentPosition < nextEntry.getSize()) {
            c = tarArchiveInputStream.read();
            currentPosition++;
            if (c == '\n') {
                break;
            } else {
                bos.write(c);
            }
        }
        return objectMapper.readValue(bos.toByteArray(), valueType);
    }

}


Thanks.

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Implementing a TarInputFormat based on FileInputFormat

Billy Bain
Hi Arvid, 

Thanks for the response. I have created a sample application with input data and uploaded it to google drive. The sample data is in the archive... thus the large size. (27 mb)


To run it:
flink run  -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader /path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path /path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        String inputPath = parameter.get("input_path");
        String outputPath = parameter.get("output_path");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<AndroidData> android = env.readFile(new TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
        final FileSink<AndroidData> sink = FileSink
                .forRowFormat(new Path(outputPath), new AndroidDataEncoder())
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withMaxPartSize(1024 * 1024)
                        .build())
                .build();
        android.sinkTo(sink);
        env.execute("zMarket Android");
    }
}


On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise <[hidden email]> wrote:
Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <[hidden email]> wrote:
I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {

    private static final Logger logger = LoggerFactory.getLogger(TarInputFormat.class);
    private transient TarArchiveInputStream tarArchiveInputStream;
    private TarArchiveEntry nextEntry;
    private final Class<E> valueType;
    private long currentPosition = 0L;
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public TarInputFormat(Path filePath, Class<E> valueType) {
        super(filePath);
        this.valueType = valueType;
        this.unsplittable = true;
        this.setNumSplits(1);
    }

    @Override
    public TypeInformation<E> getProducedType() {
        return TypeExtractor.getForClass(this.valueType);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        tarArchiveInputStream = new TarArchiveInputStream(stream);
        nextEntry = tarArchiveInputStream.getNextTarEntry();
        logger.info("Entry Name={} size={}",nextEntry.getName(), nextEntry.getSize());
    }

    @Override
    public void close() throws IOException {
        super.close();
        if (tarArchiveInputStream != null) {
            tarArchiveInputStream.close();
        }
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return nextEntry == null ||  currentPosition == nextEntry.getSize();
    }

    @Override
    public E nextRecord(E reuse) throws IOException {
        if(reachedEnd()) {
            return null;
        }
        logger.info("currentPosition={}", currentPosition);
        int c;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while (currentPosition < nextEntry.getSize()) {
            c = tarArchiveInputStream.read();
            currentPosition++;
            if (c == '\n') {
                break;
            } else {
                bos.write(c);
            }
        }
        return objectMapper.readValue(bos.toByteArray(), valueType);
    }

}


Thanks.

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Wayne D. Young
aka Billy Bob Bain
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Re: Implementing a TarInputFormat based on FileInputFormat

Yun Gao
Hi Billy,

    I checked the provided example and found it should be a problem of ContinuousFileReader, and I created an issue for it[1]. For temporarily go around the issue, I think you may disable the chain of ContinuousFileReaderOperator with the following operators:

   ....
   android.disableChaining().sinkTo(sink);
   
Best,
 Yun

[1] https://issues.apache.org/jira/browse/FLINK-20888



------------------Original Mail ------------------
Sender:Billy Bain <[hidden email]>
Send Date:Thu Jan 7 04:02:34 2021
Recipients:Arvid Heise <[hidden email]>
CC:user <[hidden email]>, Billy Bain <[hidden email]>
Subject:Re: Implementing a TarInputFormat based on FileInputFormat
Hi Arvid, 

Thanks for the response. I have created a sample application with input data and uploaded it to google drive. The sample data is in the archive... thus the large size. (27 mb)


To run it:
flink run  -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader /path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path /path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        String inputPath = parameter.get("input_path");
        String outputPath = parameter.get("output_path");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<AndroidData> android = env.readFile(new TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
        final FileSink<AndroidData> sink = FileSink
                .forRowFormat(new Path(outputPath), new AndroidDataEncoder())
                .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withMaxPartSize(1024 * 1024)
                        .build())
                .build();
        android.sinkTo(sink);
        env.execute("zMarket Android");
    }
}


On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise <[hidden email]> wrote:
Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain <[hidden email]> wrote:
I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/000000000000000000.run(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {

    private static final Logger logger = LoggerFactory.getLogger(TarInputFormat.class);
    private transient TarArchiveInputStream tarArchiveInputStream;
    private TarArchiveEntry nextEntry;
    private final Class<E> valueType;
    private long currentPosition = 0L;
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public TarInputFormat(Path filePath, Class<E> valueType) {
        super(filePath);
        this.valueType = valueType;
        this.unsplittable = true;
        this.setNumSplits(1);
    }

    @Override
    public TypeInformation<E> getProducedType() {
        return TypeExtractor.getForClass(this.valueType);
    }

    @Override
    public void open(FileInputSplit split) throws IOException {
        super.open(split);
        tarArchiveInputStream = new TarArchiveInputStream(stream);
        nextEntry = tarArchiveInputStream.getNextTarEntry();
        logger.info("Entry Name={} size={}",nextEntry.getName(), nextEntry.getSize());
    }

    @Override
    public void close() throws IOException {
        super.close();
        if (tarArchiveInputStream != null) {
            tarArchiveInputStream.close();
        }
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return nextEntry == null ||  currentPosition == nextEntry.getSize();
    }

    @Override
    public E nextRecord(E reuse) throws IOException {
        if(reachedEnd()) {
            return null;
        }
        logger.info("currentPosition={}", currentPosition);
        int c;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while (currentPosition < nextEntry.getSize()) {
            c = tarArchiveInputStream.read();
            currentPosition++;
            if (c == '\n') {
                break;
            } else {
                bos.write(c);
            }
        }
        return objectMapper.readValue(bos.toByteArray(), valueType);
    }

}


Thanks.

--
Wayne D. Young
aka Billy Bob Bain
[hidden email]


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Wayne D. Young
aka Billy Bob Bain
[hidden email]