git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: C++ RecordBatchWriter/ReadRecordBatch clarification


> So, I guess the ReadRecordBatch function is intended to only work if the records were written by RecordBatchFileWriter, right?

The *StreamWriter and *FileWriter classes use identical code paths for
writing the IPC messages, the only difference is the preamble (the
magic number and padding) and the file footer written at the end, see

https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/writer.cc#L919

https://github.com/apache/arrow/blob/master/format/IPC.md#file-format

I'm looking at your code in
https://github.com/Paradigm4/accelerated_io_tools/blob/master/src/PhysicalAioSave.cpp#L1547
 -- it is not going to work because you wrote a stream that includes
the schema as the first message. If you use
arrow::ipc::WriteRecordBatch instead, then things will work fine.
Another option is to use the generic ReadMessage function twice,
skipping the schema message if you don't need it

Hope this helps
Wes

On Sun, Apr 22, 2018 at 2:33 PM, Rares Vernica <rvernica@xxxxxxxxx> wrote:
> Hi Dimitri,
>
> Thanks for that. I was going something like that. My hope was to use
> StreamWriter to write the batch and then use ReadRecordBatch to read it
> since it is more succinct and I know I only have one batch to read.
>
> Here is the actual code I use
> https://github.com/Paradigm4/accelerated_io_tools/blob/master/src/PhysicalAioSave.cpp#L1547
> and above it, commended out, is the code I would like to use.
>
> So, I guess the ReadRecordBatch function is intended to only work if the
> records were written by RecordBatchFileWriter, right?
>
> Cheers,
> Rares
>
>
> On Tue, Apr 17, 2018 at 1:00 AM, Dimitri Vorona <alendit@xxxxxxxxxxxxxx>
> wrote:
>
>> Hi Rares,
>>
>> you use a different reader for the RecordBatch streams. See
>> arrow/ipc/ipc-read-write-test.cc:569-596 for the gist. Also, the second
>> argument to arrow::RecordBatch::Make takes the number of rows in the batch,
>> so you have  to set it to 1 in your example.
>>
>> See https://gist.github.com/alendit/c6cdd1adaf7007786392731152d3b6b9
>>
>> Cheers,
>> Dimitri.
>>
>> On Tue, Apr 17, 2018 at 3:52 AM, Rares Vernica <rvernica@xxxxxxxxx> wrote:
>>
>> > Hi,
>> >
>> > I'm writing a batch of records to a stream and I want to read them
>> later. I
>> > notice that if I use the RecordBatchStreamWriter class to write them and
>> > then ReadRecordBatch function to read them, I get a Segmentation Fault.
>> >
>> > On the other hand, if I use the RecordBatchFileWriter class to write
>> them,
>> > the reading works fine.
>> >
>> > So, is the arrow::ipc::ReadRecordBatch function intended to only work if
>> > the records were written by RecordBatchFileWriter?
>> >
>> > Below is a complete example, showing the two cases. I tried this on
>> Ubuntu
>> > Trusty with Arrow 0.9.0-1
>> >
>> > Thanks!
>> > Rares
>> >
>> >
>> >
>> >
>> >
>> > // g++-4.9 -ggdb -std=c++11 foo.cpp -larrow
>> >
>> > #include <arrow/builder.h>
>> > #include <arrow/io/memory.h>
>> > #include <arrow/ipc/reader.h>
>> > #include <arrow/ipc/writer.h>
>> > #include <arrow/memory_pool.h>
>> > #include <arrow/record_batch.h>
>> > #include <arrow/type.h>
>> >
>> > int main()
>> > {
>> >     arrow::MemoryPool* pool = arrow::default_memory_pool();
>> >
>> >     std::shared_ptr<arrow::PoolBuffer> buffer(new
>> > arrow::PoolBuffer(pool));
>> >
>> >     arrow::Int64Builder builder(pool);
>> >     builder.Append(1);
>> >
>> >     std::shared_ptr<arrow::Array> array;
>> >     builder.Finish(&array);
>> >
>> >     std::vector<std::shared_ptr<arrow::Field>> schema_vector =
>> >         {arrow::field("id", arrow::int64())};
>> >
>> >     auto schema = std::make_shared<arrow::Schema>(schema_vector);
>> >
>> >
>> >     // Write
>> >     std::shared_ptr<arrow::RecordBatch> batchOut;
>> >     batchOut = arrow::RecordBatch::Make(schema, 10, {array});
>> >
>> >     std::unique_ptr<arrow::io::BufferOutputStream> stream;
>> >     stream.reset(new arrow::io::BufferOutputStream(buffer));
>> >
>> >     std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
>> >
>> >     // #1 - Segmentation fault (core dumped)
>> >     arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
>> > &writer);
>> >
>> >     // #2 - OK
>> >     //arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
>> > &writer);
>> >
>> >     writer->WriteRecordBatch(*batchOut);
>> >
>> >     writer->Close();
>> >     stream->Close();
>> >
>> >
>> >     // Read
>> >     arrow::io::BufferReader reader(buffer);
>> >     std::shared_ptr<arrow::RecordBatch> batchIn;
>> >     arrow::ipc::ReadRecordBatch(schema, &reader, &batchIn);
>> > }
>> >
>>