git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.


Some answers / ideas:

The typical: Write in Kafka
The Fashion: Pravega (from Apache Flink)
The Future: Wait to Erasure Code in HDFS 3

On Wed, 19 Dec 2018 at 16:41, Wes McKinney <wesmckinn@xxxxxxxxx> wrote:

> We could certainly develop some tools in C++ and/or Python to assist
> with the compaction workflows. If you have an idea about how these
> might look and be generally useful, please feel free to propose in a
> JIRA issue
>
> On Wed, Dec 19, 2018 at 9:09 AM Joel Pfaff <joel.pfaff@xxxxxxxxx> wrote:
> >
> > Unfortunately I cannot use kudu in my projects, I would have loved to
> give
> > it a try. I did not know about hudi, it seems very similar to what we do
> > (Parquet + Avro), I will have a look.
> > I am following the iceberg project very closely, because it appears to
> > solve a lot of problems that we face on a regular basis.
> > I am really excited to learn that the arrow and iceberg projects could
> work
> > together and I can hope for a lot of good things coming out of these.
> >
> > On Wed, Dec 19, 2018 at 2:52 PM Uwe L. Korn <uwelk@xxxxxxxxxx> wrote:
> >
> > > This can also be solved by using a table format like
> > > https://github.com/uber/hudi or
> > > https://github.com/apache/incubator-iceberg where the latter has a PR
> > > open for a basic Python implementation with pyarrow.
> > >
> > > These table formats support using Avro and Parquet seamlessly together
> > > without the reader needing to take care of the storage format.
> > >
> > > Uwe
> > >
> > > > Am 19.12.2018 um 14:47 schrieb Wes McKinney <wesmckinn@xxxxxxxxx>:
> > > >
> > > > This turns out to be a very common problem (landing incremental
> > > > updates, dealing with compaction and small files). It's part of the
> > > > reason that systems like Apache Kudu were developed, e.g.
> > > >
> > > >
> > >
> https://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
> > > >
> > > > If you have to use file storage, then figuring out a scheme to
> compact
> > > > Parquet files (e.g. once per hour, once per day) will definitely be
> > > > worth it compared with using a slower file format (like Avro)
> > > >
> > > > - Wes
> > > >
> > > >> On Wed, Dec 19, 2018 at 7:37 AM Joel Pfaff <joel.pfaff@xxxxxxxxx>
> > > wrote:
> > > >>
> > > >> Hello,
> > > >>
> > > >> For my company's usecases, we have found that the number of files
> was a
> > > >> critical part of the time spent doing the execution plan, so we
> found
> > > the
> > > >> idea of very regularly writing small parquet files to be rather
> > > inefficient.
> > > >>
> > > >> There are some formats that support an `append` semantic (I have
> tested
> > > >> successfully with avro, but there are a couple others that could be
> used
> > > >> similarly).
> > > >> So we had a few cases where we were aggregating data in a `current
> > > table`
> > > >> in set of avro files, and rewriting all of it in few parquet files
> at
> > > the
> > > >> end of the day.
> > > >> This allowed us to have files that have been prepared to optimize
> their
> > > >> querying performance (file size, row group size, sorting per
> column) by
> > > >> maximizing the ability to benefit from the statistics.
> > > >> And our queries were doing an UNION between "optimized for speed"
> > > history
> > > >> tables and "optimized for latency" current tables, when the query
> > > timeframe
> > > >> was crossing the boundaries of the current day.
> > > >>
> > > >> Regards, Joel
> > > >>
> > > >> On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques <
> > > >> fsaintjacques@xxxxxxxxxxxxxxx> wrote:
> > > >>
> > > >>> Hello Darren,
> > > >>>
> > > >>> what Uwe suggests is usually the way to go, your active process
> writes
> > > to a
> > > >>> new file every time. Then you have a parallel process/thread that
> does
> > > >>> compaction of smaller files in the background such that you don't
> have
> > > too
> > > >>> many files.
> > > >>>
> > > >>>> On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uwelk@xxxxxxxxxx>
> wrote:
> > > >>>>
> > > >>>> Hello Darren,
> > > >>>>
> > > >>>> you're out of luck here. Parquet files are immutable and meant for
> > > batch
> > > >>>> writes. Once they're written you cannot modify them anymore. To
> load
> > > >>> them,
> > > >>>> you need to know their metadata which is in the footer. The
> footer is
> > > >>>> always at the end of the file and written once you call close.
> > > >>>>
> > > >>>> Your use case is normally fulfilled by continously starting new
> files
> > > and
> > > >>>> reading them back in using the ParquetDataset class
> > > >>>>
> > > >>>> Cheers
> > > >>>> Uwe
> > > >>>>
> > > >>>> Am 18.12.2018 um 21:03 schrieb Darren Gallagher <dazzag@xxxxxxxxx
> >:
> > > >>>>
> > > >>>>>> [Cross posted from https://github.com/apache/arrow/issues/3203]
> > > >>>>>>
> > > >>>>>> I'm adding new data to a parquet file every 60 seconds using
> this
> > > >>> code:
> > > >>>>>>
> > > >>>>>> import os
> > > >>>>>> import json
> > > >>>>>> import time
> > > >>>>>> import requests
> > > >>>>>> import pandas as pd
> > > >>>>>> import numpy as np
> > > >>>>>> import pyarrow as pa
> > > >>>>>> import pyarrow.parquet as pq
> > > >>>>>>
> > > >>>>>> api_url = 'https://opensky-network.org/api/states/all'
> > > >>>>>>
> > > >>>>>> cols = ['icao24', 'callsign', 'origin', 'time_position',
> > > >>>>>>       'last_contact', 'longitude', 'latitude',
> > > >>>>>>       'baro_altitude', 'on_ground', 'velocity', 'true_track',
> > > >>>>>>       'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
> > > >>>>>>       'spi', 'position_source']
> > > >>>>>>
> > > >>>>>> def get_new_flight_info(writer):
> > > >>>>>>   print("Requesting new data")
> > > >>>>>>   req = requests.get(api_url)
> > > >>>>>>   content = req.json()
> > > >>>>>>
> > > >>>>>>   states = content['states']
> > > >>>>>>   df = pd.DataFrame(states, columns = cols)
> > > >>>>>>   df['timestamp'] = content['time']
> > > >>>>>>   print("Found {} new items".format(len(df)))
> > > >>>>>>
> > > >>>>>>   table = pa.Table.from_pandas(df)
> > > >>>>>>   if writer is None:
> > > >>>>>>       writer = pq.ParquetWriter('openskyflights.parquet',
> > > >>> table.schema)
> > > >>>>>>   writer.write_table(table=table)
> > > >>>>>>   return writer
> > > >>>>>>
> > > >>>>>> if __name__ == '__main__':
> > > >>>>>>   writer = None
> > > >>>>>>   while (not os.path.exists('opensky.STOP')):
> > > >>>>>>       writer = get_new_flight_info(writer)
> > > >>>>>>       time.sleep(60)
> > > >>>>>>
> > > >>>>>>   if writer:
> > > >>>>>>       writer.close()
> > > >>>>>>
> > > >>>>>> This is working fine and the file grows every 60 seconds.
> > > >>>>>> However unless I force the loop to exit I am unable to use the
> > > parquet
> > > >>>>>> file. In a separate terminal I try to access the parquet file
> using
> > > >>> this
> > > >>>>>> code:
> > > >>>>>>
> > > >>>>>> import pandas as pd
> > > >>>>>> import pyarrow.parquet as pq
> > > >>>>>>
> > > >>>>>> table = pq.read_table("openskyflights.parquet")
> > > >>>>>> df = table.to_pandas()
> > > >>>>>> print(len(df))
> > > >>>>>>
> > > >>>>>> which results in this error:
> > > >>>>>>
> > > >>>>>> Traceback (most recent call last):
> > > >>>>>> File "checkdownloadsize.py", line 7, in <module>
> > > >>>>>>   table = pq.read_table("openskyflights.parquet")
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 1074, in read_table
> > > >>>>>>   use_pandas_metadata=use_pandas_metadata)
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py",
> > > >>>> line 182, in read_parquet
> > > >>>>>>   filesystem=self)
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 882, in __init__
> > > >>>>>>   self.validate_schemas()
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 895, in validate_schemas
> > > >>>>>>   self.schema = self.pieces[0].get_metadata(open_file).schema
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 453, in get_metadata
> > > >>>>>>   return self._open(open_file_func).metadata
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 459, in _open
> > > >>>>>>   reader = open_file_func(self.path)
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 984, in open_file
> > > >>>>>>   common_metadata=self.common_metadata)
> > > >>>>>> File
> > > >>>>
> > > >>>
> > >
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > >>>> line 102, in __init__
> > > >>>>>>   self.reader.open(source, metadata=metadata)
> > > >>>>>> File "pyarrow/_parquet.pyx", line 639, in
> > > >>>> pyarrow._parquet.ParquetReader.open
> > > >>>>>> File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> > > >>>>>> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
> > > >>>>>>
> > > >>>>>> Is there a way to achieve this?
> > > >>>>>> I'm assuming that if I call writer.close() in the while loop
> then it
> > > >>>> will
> > > >>>>>> prevent any further data being written to the file? Is there
> some
> > > kind
> > > >>>> of
> > > >>>>>> "flush" operation that can be used to ensure all data is
> written to
> > > >>> disk
> > > >>>>>> and available to other processes or threads that want to read
> the
> > > >>> data?
> > > >>>>>>
> > > >>>>>> Thanks
> > > >>>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>> --
> > > >>> Sent from my jetpack.
> > > >>>
> > >
>