git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Flink-9407] Question about proposed ORC Sink !


Thanks @zhangminglei and @Fabian for confirming.

Even I looked at the ORC parsing code and it seems that using <struct> type
is mandatory for now.

Thanks,
Sagar

On Wed, Jun 27, 2018 at 12:59 AM, Fabian Hueske <fhueske@xxxxxxxxx> wrote:

> Hi Sagar,
>
> That's more a question for the ORC community, but AFAIK, the top-level
> type is always a struct because it needs to wrap the fields, e.g.,
> struct(name:string, age:int)
>
> Best, Fabian
>
> 2018-06-26 22:38 GMT+02:00 sagar loke <sagarit2@xxxxxxxxx>:
>
>> @zhangminglei,
>>
>> Question about the schema for ORC format:
>>
>> 1. Does it always need to be of complex type "<Struct>" ?
>>
>> 2. Or can it be created with individual data types directly ?
>>     eg. "name:string, age:int" ?
>>
>>
>> Thanks,
>> Sagar
>>
>> On Fri, Jun 22, 2018 at 11:56 PM, zhangminglei <18717838093@xxxxxxx>
>> wrote:
>>
>>> Yes, it should be exit. Thanks to Ted Yu. Very exactly!
>>>
>>> Cheers
>>> Zhangminglei
>>>
>>> 在 2018年6月23日,下午12:40,Ted Yu <yuzhihong@xxxxxxxxx> 写道:
>>>
>>> For #1, the word exist should be exit, right ?
>>> Thanks
>>>
>>> -------- Original message --------
>>> From: zhangminglei <18717838093@xxxxxxx>
>>> Date: 6/23/18 10:12 AM (GMT+08:00)
>>> To: sagar loke <sagarit2@xxxxxxxxx>
>>> Cc: dev <dev@xxxxxxxxxxxxxxxx>, user <user@xxxxxxxxxxxxxxxx>
>>> Subject: Re: [Flink-9407] Question about proposed ORC Sink !
>>>
>>> Hi, Sagar.
>>>
>>> 1. It solves the issue partially meaning files which have finished
>>> checkpointing don't show .pending status but the files which were in
>>> progress
>>>     when the program exists are still in .pending state.
>>>
>>>
>>> Ans:
>>>
>>> Yea, Make the program exists and in that time if a checkpoint does not
>>> finished will lead the status keeps in .pending state then. Under the
>>> normal circumstances, the programs that running in the production env will
>>> never be stoped or existed if everything is fine.
>>>
>>> 2. Ideally, writer should work with default settings correct ? Meaning
>>> we don't have to explicitly set these parameters to make it work.
>>>     Is this assumption correct ?
>>>
>>>
>>> Ans:
>>>
>>> Yes. Writer should work with default settings correct.
>>> Yes. We do not have to explicitly set these parameters to make it work.
>>> Yes. Assumption correct indeed.
>>>
>>> However, you know, flink is a real time streaming framework, so under
>>> normal circumstances,you don't really go to use the default settings when
>>> it comes to a specific business. Especially together work with *offline
>>> end*(Like hadoop mapreduce). In this case, you need to tell the offline
>>> end when time a bucket is close and when time the data for the specify
>>> bucket is ready. So, you can take a look on https://issues.apache.org/j
>>> ira/browse/FLINK-9609.
>>>
>>> Cheers
>>> Zhangminglei
>>>
>>>
>>> 在 2018年6月23日,上午8:23,sagar loke <sagarit2@xxxxxxxxx> 写道:
>>>
>>> Hi Zhangminglei,
>>>
>>> Thanks for the reply.
>>>
>>> 1. It solves the issue partially meaning files which have finished
>>> checkpointing don't show .pending status but the files which were in
>>> progress
>>>     when the program exists are still in .pending state.
>>>
>>> 2. Ideally, writer should work with default settings correct ? Meaning
>>> we don't have to explicitly set these parameters to make it work.
>>>     Is this assumption correct ?
>>>
>>> Thanks,
>>> Sagar
>>>
>>> On Fri, Jun 22, 2018 at 3:25 AM, zhangminglei <18717838093@xxxxxxx>
>>> wrote:
>>>
>>>> Hi, Sagar. Please use the below code and you will find the part files
>>>> status from _part-0-107.in-progress   to _part-0-107.pending and
>>>> finally to part-0-107. [For example], you need to run the program for a
>>>> while. However, we need set some parameters, like the following. Moreover,
>>>> *enableCheckpointing* IS also needed. I know why you always see the
>>>> *.pending* file since the below parameters default value is 60 seconds
>>>> even though you set the enableCheckpoint. So, that is why you can not see
>>>> the finished file status until 60 seconds passed.
>>>>
>>>> Attached is the ending on my end, and you will see what you want!
>>>>
>>>> Please let me know if you still have the problem.
>>>>
>>>> Cheers
>>>> Zhangminglei
>>>>
>>>> setInactiveBucketCheckInterval(2000)
>>>> .setInactiveBucketThreshold(2000);
>>>>
>>>>
>>>> public class TestOrc {
>>>>    public static void main(String[] args) throws Exception {
>>>>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>       env.setParallelism(1);
>>>>       env.enableCheckpointing(1000);
>>>>       env.setStateBackend(new MemoryStateBackend());
>>>>
>>>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>>>
>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>
>>>>       bucketingSink
>>>>          .setWriter(new OrcFileWriter<>(orcSchemaString))
>>>>          .setInactiveBucketCheckInterval(2000)
>>>>          .setInactiveBucketThreshold(2000);
>>>>
>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>
>>>>       dataStream.addSink(bucketingSink);
>>>>
>>>>       env.execute();
>>>>    }
>>>>
>>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>>          for (int i = 0; i < 2147483000; i++) {
>>>>             Row row = new Row(3);
>>>>             row.setField(0, "Sagar");
>>>>             row.setField(1, 26 + i);
>>>>             row.setField(2, false);
>>>>             ctx.collect(row);
>>>>          }
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>
>>>>       }
>>>>    }
>>>> }
>>>>
>>>> <filestatus.jpg>
>>>>
>>>>
>>>>
>>>> 在 2018年6月22日,上午11:14,sagar loke <sagarit2@xxxxxxxxx> 写道:
>>>>
>>>> Sure, we can solve it together :)
>>>>
>>>> Are you able to reproduce it ?
>>>>
>>>> Thanks,
>>>> Sagar
>>>>
>>>> On Thu, Jun 21, 2018 at 7:28 PM zhangminglei <18717838093@xxxxxxx>
>>>> wrote:
>>>>
>>>>> Sagar, flush will be called when do a checkpoint. Please see
>>>>>
>>>>> bucketState.currentFileValidLength = bucketState.writer.flush();
>>>>>
>>>>>
>>>>>
>>>>> @Override
>>>>> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>>>>>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized.");
>>>>>
>>>>>    restoredBucketStates.clear();
>>>>>
>>>>>    synchronized (state.bucketStates) {
>>>>>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>>>>>
>>>>>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) {
>>>>>          BucketState<T> bucketState = bucketStateEntry.getValue();
>>>>>
>>>>>          if (bucketState.isWriterOpen) {
>>>>>             bucketState.currentFileValidLength = bucketState.writer.flush();
>>>>>          }
>>>>>
>>>>>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>>>>>             bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);
>>>>>          }
>>>>>          bucketState.pendingFiles = new ArrayList<>();
>>>>>       }
>>>>>       restoredBucketStates.add(state);
>>>>>
>>>>>       if (LOG.isDebugEnabled()) {
>>>>>          LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state);
>>>>>       }
>>>>>    }
>>>>>
>>>>>
>>>>>
>>>>> 在 2018年6月22日,上午10:21,sagar loke <sagarit2@xxxxxxxxx> 写道:
>>>>>
>>>>> Thanks for replying.
>>>>>
>>>>> Yes, I tried with different values of checkpoint eg. 20, 100, 5000.
>>>>>
>>>>> env.enablecheckpointing(100);
>>>>>
>>>>> But in all the cases, I still see .pending state.
>>>>>
>>>>> Not sure if it’s related to flush() method from OrcFileWriter ? Which
>>>>> might not be getting called somehow ?
>>>>>
>>>>> Thanks,
>>>>> Sagar
>>>>>
>>>>> On Thu, Jun 21, 2018 at 7:17 PM zhangminglei <18717838093@xxxxxxx>
>>>>> wrote:
>>>>>
>>>>>> Hi,Sagar
>>>>>>
>>>>>> Please take a look at BucketingSink, It says that a file would keep
>>>>>> .pending status if you DO NOT do a checkpoint. Doc says,  when a checkpoint
>>>>>> is successful the currently pending file will be removed to {@code
>>>>>> finished}.
>>>>>> Take a try again. I think you should call the below method and see
>>>>>> what would happen on it. Anyway, I will also try that and see whether it
>>>>>> works. Please let me know if you still meet error.
>>>>>>
>>>>>>  env.enableCheckpointing(200);
>>>>>>
>>>>>> /**
>>>>>>  * The suffix for {@code pending} part files. These are closed files that we are
>>>>>>  * not currently writing to (inactive or reached {@link #batchSize}), but which
>>>>>>  * were not yet confirmed by a checkpoint.
>>>>>>  */
>>>>>> private static final String DEFAULT_PENDING_SUFFIX = ".pending";
>>>>>>
>>>>>> <p>Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}.
>>>>>> * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once
>>>>>> * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once
>>>>>> * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently
>>>>>> * pending files will be moved to {@code finished}.
>>>>>>
>>>>>>
>>>>>> Cheers
>>>>>> Zhangminglei
>>>>>>
>>>>>>
>>>>>>
>>>>>> 在 2018年6月22日,上午4:46,sagar loke <sagarit2@xxxxxxxxx> 写道:
>>>>>>
>>>>>> Thanks Zhangminglei for quick response.
>>>>>>
>>>>>> I tried the above code and I am seeing another issue where the files
>>>>>> created on hdfs are always in *.pending* state.
>>>>>>
>>>>>> Let me know if you can reproduce it ?
>>>>>>
>>>>>> Thanks,
>>>>>> Sagar
>>>>>>
>>>>>> On Thu, Jun 21, 2018 at 3:20 AM, zhangminglei <18717838093@xxxxxxx>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, Sagar
>>>>>>>
>>>>>>> I did a local test for that and it seems works fine for me. PR will
>>>>>>> be updated for [FLINK-9407]
>>>>>>>
>>>>>>> I will update the newest code to PR soon and below is the example I
>>>>>>> was using for my test. You can check it again. Hopes you can enjoy it!
>>>>>>>
>>>>>>> Cheers
>>>>>>> Zhangminglei.
>>>>>>>
>>>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>>>> import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
>>>>>>> import org.apache.flink.types.Row;
>>>>>>>
>>>>>>> public class TestOrc {
>>>>>>>    public static void main(String[] args) throws Exception {
>>>>>>>       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>       env.setParallelism(1);
>>>>>>>
>>>>>>>       String orcSchemaString = "struct<name:string,age:int,married:boolean>";
>>>>>>>       String path = "hdfs://10.199.196.0:9000/data/hive/man";
>>>>>>>
>>>>>>>       BucketingSink<Row> bucketingSink = new BucketingSink<>(path);
>>>>>>>
>>>>>>>       bucketingSink.setWriter(new OrcFileWriter<>(orcSchemaString));
>>>>>>>
>>>>>>>       DataStream<Row> dataStream = env.addSource(new ManGenerator());
>>>>>>>
>>>>>>>       dataStream.addSink(bucketingSink);
>>>>>>>
>>>>>>>       env.execute();
>>>>>>>    }
>>>>>>>
>>>>>>>    public static class ManGenerator implements SourceFunction<Row> {
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void run(SourceContext<Row> ctx) throws Exception {
>>>>>>>          for (int i = 0; i < 3; i++) {
>>>>>>>             Row row = new Row(3);
>>>>>>>             row.setField(0, "Sagar");
>>>>>>>             row.setField(1, 26 + i);
>>>>>>>             row.setField(2, false);
>>>>>>>             ctx.collect(row);
>>>>>>>          }
>>>>>>>       }
>>>>>>>
>>>>>>>       @Override
>>>>>>>       public void cancel() {
>>>>>>>
>>>>>>>       }
>>>>>>>    }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 在 2018年6月21日,上午1:47,sagar loke <sagarit2@xxxxxxxxx> 写道:
>>>>>>>
>>>>>>> Hi Zhangminglei,
>>>>>>>
>>>>>>> Question about  https://issues.apache.org/jira/browse/FLINK-9407
>>>>>>>
>>>>>>> I tried to use the code from PR and run it on local hdfs cluster to
>>>>>>> write some ORC data.
>>>>>>>
>>>>>>> But somehow this code is failing with following error:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs
>>>>>>>> .protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE
>>>>>>>> /tmp/hivedatanew2/2018-06-20/10/34/_part-0-0.in-progress for
>>>>>>>> DFSClient_NONMAPREDUCE_73219864_36 on 127.0.0.1 because this file
>>>>>>>> lease is currently owned by DFSClient_NONMAPREDUCE_-1374584007_36
>>>>>>>> on 127.0.0.1
>>>>>>>
>>>>>>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverL
>>>>>>>> easeInternal(FSNamesystem.java:2500)
>>>>>>>
>>>>>>>
>>>>>>> I understand that this error is related to Hadoop but somehow I get
>>>>>>> this error only when executing the code from this PR.
>>>>>>>
>>>>>>> I had created very crude way to write ORC file to HDFS as per
>>>>>>> follows. Below code works alright and does not throw above error.
>>>>>>>
>>>>>>> import org.apache.flink.streaming.connectors.fs.Writer;
>>>>>>>> import org.apache.hadoop.fs.FileSystem;
>>>>>>>> import org.apache.hadoop.fs.Path;
>>>>>>>> import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
>>>>>>>> import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
>>>>>>>> import org.apache.orc.OrcFile;
>>>>>>>> import org.apache.orc.TypeDescription;
>>>>>>>> import org.apache.hadoop.conf.Configuration;
>>>>>>>>
>>>>>>>> import java.io.IOException;
>>>>>>>>
>>>>>>>> public class FlinkOrcWriterV1<T> implements org.apache.flink.streaming.connectors.fs.Writer<T> {
>>>>>>>>
>>>>>>>>     private transient org.apache.orc.Writer orcWriter;
>>>>>>>>     String schema;
>>>>>>>>     TypeDescription typeDescriptionschema;//"struct<x:int,y:int>"
>>>>>>>>     String basePath;
>>>>>>>>
>>>>>>>>     public FlinkOrcWriterV1(String schema) {
>>>>>>>>         this.schema = schema;
>>>>>>>>         this.typeDescriptionschema = TypeDescription.fromString(schema);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void open(FileSystem fs, Path path) throws IOException {
>>>>>>>>         Configuration conf = new Configuration();
>>>>>>>>         orcWriter = OrcFile.createWriter(new Path("hdfs://localhost:9000/tmp/hivedata3/"),
>>>>>>>>                     OrcFile.writerOptions(conf)
>>>>>>>>                         .setSchema(typeDescriptionschema));
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public long flush() throws IOException {
>>>>>>>>         return orcWriter.writeIntermediateFooter();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public long getPos() throws IOException {
>>>>>>>>         return orcWriter.getRawDataSize();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void close() throws IOException {
>>>>>>>>         orcWriter.close();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void write(T element) throws IOException {
>>>>>>>>         VectorizedRowBatch batch = typeDescriptionschema.createRowBatch(10);
>>>>>>>>         LongColumnVector x = (LongColumnVector) batch.cols[0];
>>>>>>>>         LongColumnVector y = (LongColumnVector) batch.cols[1];
>>>>>>>>         for(int r=0; r < 10; ++r) {
>>>>>>>>             int row = batch.size++;
>>>>>>>>             x.vector[row] = r;
>>>>>>>>             y.vector[row] = r * 3;
>>>>>>>>             // If the batch is full, write it out and start over.
>>>>>>>>             if (batch.size == batch.getMaxSize()) {
>>>>>>>>                 orcWriter.addRowBatch(batch);
>>>>>>>>                 batch.reset();
>>>>>>>>             }
>>>>>>>>         }
>>>>>>>>         if (batch.size != 0) {
>>>>>>>>             orcWriter.addRowBatch(batch);
>>>>>>>>             batch.reset();
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public FlinkOrcWriterV1<T> duplicate() {
>>>>>>>>         return new FlinkOrcWriterV1<>(schema);
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> Not sure, if the error is related to any of the hadoop dependencies
>>>>>>> or something else ?
>>>>>>>
>>>>>>> Can you please look into it and let me know if you can reproduce it
>>>>>>> on your end too ?
>>>>>>>
>>>>>>> By the way, following are my dependencies in my project:
>>>>>>>
>>>>>>> dependencies {
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-java:1.4.2'
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-runtime_2.11:1.4.2'
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-streaming-java_2.11:1.4.2'
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-connec
>>>>>>>>> tor-kafka-0.11_2.11:1.4.2'
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-connec
>>>>>>>>> tor-elasticsearch5_2.11:1.4.2'
>>>>>>>>
>>>>>>>>     compile 'io.confluent:kafka-avro-serializer:3.3.0'
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-avro:1.4.2'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.kafka', name: 'kafka_2.11', version:
>>>>>>>>> '1.1.0'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.flink', name:
>>>>>>>>> 'flink-shaded-hadoop2', version: '1.4.2'
>>>>>>>>
>>>>>>>>     compile 'org.apache.flink:flink-connec
>>>>>>>>> tor-filesystem_2.11:1.4.2'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.flink', name: 'flink-jdbc', version:
>>>>>>>>> '1.4.2'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.flink', name: 'flink-table_2.11',
>>>>>>>>> version: '1.4.2'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.orc', name: 'orc-core', version:
>>>>>>>>> '1.5.1'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-avro',
>>>>>>>>> version: '1.10.0'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.parquet', name: 'parquet-common',
>>>>>>>>> version: '1.10.0'
>>>>>>>>
>>>>>>>>     compile group: 'org.apache.flink', name: 'flink-orc_2.11',
>>>>>>>>> version: '1.4.2'
>>>>>>>>
>>>>>>>>     testCompile group: 'junit', name: 'junit', version: '4.12'
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks,
>>>>>>> Sagar.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> SAGAR.
>>>>>>
>>>>>>
>>>>>> --
>>>>> Cheers,
>>>>> Sagar
>>>>>
>>>>>
>>>>> --
>>>> Cheers,
>>>> Sagar
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> SAGAR.
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> SAGAR.
>>
>
>


-- 
Regards,
SAGAR.