git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.


> [Cross posted from https://github.com/apache/arrow/issues/3203]
>
> I'm adding new data to a parquet file every 60 seconds using this code:
>
> import os
> import json
> import time
> import requests
> import pandas as pd
> import numpy as np
> import pyarrow as pa
> import pyarrow.parquet as pq
>
> api_url = 'https://opensky-network.org/api/states/all'
>
> cols = ['icao24', 'callsign', 'origin', 'time_position',
>         'last_contact', 'longitude', 'latitude',
>         'baro_altitude', 'on_ground', 'velocity', 'true_track',
>         'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
>         'spi', 'position_source']
>
> def get_new_flight_info(writer):
>     print("Requesting new data")
>     req = requests.get(api_url)
>     content = req.json()
>
>     states = content['states']
>     df = pd.DataFrame(states, columns = cols)
>     df['timestamp'] = content['time']
>     print("Found {} new items".format(len(df)))
>
>     table = pa.Table.from_pandas(df)
>     if writer is None:
>         writer = pq.ParquetWriter('openskyflights.parquet', table.schema)
>     writer.write_table(table=table)
>     return writer
>
> if __name__ == '__main__':
>     writer = None
>     while (not os.path.exists('opensky.STOP')):
>         writer = get_new_flight_info(writer)
>         time.sleep(60)
>
>     if writer:
>         writer.close()
>
> This is working fine and the file grows every 60 seconds.
> However unless I force the loop to exit I am unable to use the parquet
> file. In a separate terminal I try to access the parquet file using this
> code:
>
> import pandas as pd
> import pyarrow.parquet as pq
>
> table = pq.read_table("openskyflights.parquet")
> df = table.to_pandas()
> print(len(df))
>
> which results in this error:
>
> Traceback (most recent call last):
>   File "checkdownloadsize.py", line 7, in <module>
>     table = pq.read_table("openskyflights.parquet")
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 1074, in read_table
>     use_pandas_metadata=use_pandas_metadata)
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", line 182, in read_parquet
>     filesystem=self)
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 882, in __init__
>     self.validate_schemas()
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 895, in validate_schemas
>     self.schema = self.pieces[0].get_metadata(open_file).schema
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 453, in get_metadata
>     return self._open(open_file_func).metadata
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 459, in _open
>     reader = open_file_func(self.path)
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 984, in open_file
>     common_metadata=self.common_metadata)
>   File "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", line 102, in __init__
>     self.reader.open(source, metadata=metadata)
>   File "pyarrow/_parquet.pyx", line 639, in pyarrow._parquet.ParquetReader.open
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
>
> Is there a way to achieve this?
> I'm assuming that if I call writer.close() in the while loop then it will
> prevent any further data being written to the file? Is there some kind of
> "flush" operation that can be used to ensure all data is written to disk
> and available to other processes or threads that want to read the data?
>
> Thanks
>