git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.


This can also be solved by using a table format like https://github.com/uber/hudi or https://github.com/apache/incubator-iceberg where the latter has a PR open for a basic Python implementation with pyarrow. 

These table formats support using Avro and Parquet seamlessly together without the reader needing to take care of the storage format.

Uwe

> Am 19.12.2018 um 14:47 schrieb Wes McKinney <wesmckinn@xxxxxxxxx>:
> 
> This turns out to be a very common problem (landing incremental
> updates, dealing with compaction and small files). It's part of the
> reason that systems like Apache Kudu were developed, e.g.
> 
> https://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
> 
> If you have to use file storage, then figuring out a scheme to compact
> Parquet files (e.g. once per hour, once per day) will definitely be
> worth it compared with using a slower file format (like Avro)
> 
> - Wes
> 
>> On Wed, Dec 19, 2018 at 7:37 AM Joel Pfaff <joel.pfaff@xxxxxxxxx> wrote:
>> 
>> Hello,
>> 
>> For my company's usecases, we have found that the number of files was a
>> critical part of the time spent doing the execution plan, so we found the
>> idea of very regularly writing small parquet files to be rather inefficient.
>> 
>> There are some formats that support an `append` semantic (I have tested
>> successfully with avro, but there are a couple others that could be used
>> similarly).
>> So we had a few cases where we were aggregating data in a `current table`
>> in set of avro files, and rewriting all of it in few parquet files at the
>> end of the day.
>> This allowed us to have files that have been prepared to optimize their
>> querying performance (file size, row group size, sorting per column) by
>> maximizing the ability to benefit from the statistics.
>> And our queries were doing an UNION between "optimized for speed" history
>> tables and "optimized for latency" current tables, when the query timeframe
>> was crossing the boundaries of the current day.
>> 
>> Regards, Joel
>> 
>> On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques <
>> fsaintjacques@xxxxxxxxxxxxxxx> wrote:
>> 
>>> Hello Darren,
>>> 
>>> what Uwe suggests is usually the way to go, your active process writes to a
>>> new file every time. Then you have a parallel process/thread that does
>>> compaction of smaller files in the background such that you don't have too
>>> many files.
>>> 
>>>> On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uwelk@xxxxxxxxxx> wrote:
>>>> 
>>>> Hello Darren,
>>>> 
>>>> you're out of luck here. Parquet files are immutable and meant for batch
>>>> writes. Once they're written you cannot modify them anymore. To load
>>> them,
>>>> you need to know their metadata which is in the footer. The footer is
>>>> always at the end of the file and written once you call close.
>>>> 
>>>> Your use case is normally fulfilled by continously starting new files and
>>>> reading them back in using the ParquetDataset class
>>>> 
>>>> Cheers
>>>> Uwe
>>>> 
>>>> Am 18.12.2018 um 21:03 schrieb Darren Gallagher <dazzag@xxxxxxxxx>:
>>>> 
>>>>>> [Cross posted from https://github.com/apache/arrow/issues/3203]
>>>>>> 
>>>>>> I'm adding new data to a parquet file every 60 seconds using this
>>> code:
>>>>>> 
>>>>>> import os
>>>>>> import json
>>>>>> import time
>>>>>> import requests
>>>>>> import pandas as pd
>>>>>> import numpy as np
>>>>>> import pyarrow as pa
>>>>>> import pyarrow.parquet as pq
>>>>>> 
>>>>>> api_url = 'https://opensky-network.org/api/states/all'
>>>>>> 
>>>>>> cols = ['icao24', 'callsign', 'origin', 'time_position',
>>>>>>       'last_contact', 'longitude', 'latitude',
>>>>>>       'baro_altitude', 'on_ground', 'velocity', 'true_track',
>>>>>>       'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
>>>>>>       'spi', 'position_source']
>>>>>> 
>>>>>> def get_new_flight_info(writer):
>>>>>>   print("Requesting new data")
>>>>>>   req = requests.get(api_url)
>>>>>>   content = req.json()
>>>>>> 
>>>>>>   states = content['states']
>>>>>>   df = pd.DataFrame(states, columns = cols)
>>>>>>   df['timestamp'] = content['time']
>>>>>>   print("Found {} new items".format(len(df)))
>>>>>> 
>>>>>>   table = pa.Table.from_pandas(df)
>>>>>>   if writer is None:
>>>>>>       writer = pq.ParquetWriter('openskyflights.parquet',
>>> table.schema)
>>>>>>   writer.write_table(table=table)
>>>>>>   return writer
>>>>>> 
>>>>>> if __name__ == '__main__':
>>>>>>   writer = None
>>>>>>   while (not os.path.exists('opensky.STOP')):
>>>>>>       writer = get_new_flight_info(writer)
>>>>>>       time.sleep(60)
>>>>>> 
>>>>>>   if writer:
>>>>>>       writer.close()
>>>>>> 
>>>>>> This is working fine and the file grows every 60 seconds.
>>>>>> However unless I force the loop to exit I am unable to use the parquet
>>>>>> file. In a separate terminal I try to access the parquet file using
>>> this
>>>>>> code:
>>>>>> 
>>>>>> import pandas as pd
>>>>>> import pyarrow.parquet as pq
>>>>>> 
>>>>>> table = pq.read_table("openskyflights.parquet")
>>>>>> df = table.to_pandas()
>>>>>> print(len(df))
>>>>>> 
>>>>>> which results in this error:
>>>>>> 
>>>>>> Traceback (most recent call last):
>>>>>> File "checkdownloadsize.py", line 7, in <module>
>>>>>>   table = pq.read_table("openskyflights.parquet")
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 1074, in read_table
>>>>>>   use_pandas_metadata=use_pandas_metadata)
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py",
>>>> line 182, in read_parquet
>>>>>>   filesystem=self)
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 882, in __init__
>>>>>>   self.validate_schemas()
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 895, in validate_schemas
>>>>>>   self.schema = self.pieces[0].get_metadata(open_file).schema
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 453, in get_metadata
>>>>>>   return self._open(open_file_func).metadata
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 459, in _open
>>>>>>   reader = open_file_func(self.path)
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 984, in open_file
>>>>>>   common_metadata=self.common_metadata)
>>>>>> File
>>>> 
>>> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
>>>> line 102, in __init__
>>>>>>   self.reader.open(source, metadata=metadata)
>>>>>> File "pyarrow/_parquet.pyx", line 639, in
>>>> pyarrow._parquet.ParquetReader.open
>>>>>> File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
>>>>>> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
>>>>>> 
>>>>>> Is there a way to achieve this?
>>>>>> I'm assuming that if I call writer.close() in the while loop then it
>>>> will
>>>>>> prevent any further data being written to the file? Is there some kind
>>>> of
>>>>>> "flush" operation that can be used to ensure all data is written to
>>> disk
>>>>>> and available to other processes or threads that want to read the
>>> data?
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> Sent from my jetpack.
>>>