git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Pipeline is passing on local runner and failing on Dataflow runner - help with error


Hello,

I am running the following pipeline on the local runner with no issues.

logging.info('Define the pipeline')
p =  beam.Pipeline(options=options)
samplePath = outputPath
ExploreData = (p | "Extract the rows from dataframe" >> beam.io.Read(beam.io.BigQuerySource('archs4.Debug_annotation'))
                 | "create more columns" >> beam.ParDo(CreateColForSampleFn(colListSubset,outputPath)))
(ExploreData | 'writing to TSV files' >> beam.io.WriteToText('gs://archs4/output/dataExploration.txt',file_name_suffix='.tsv',num_shards=1,append_trailing_newlines=True,header=colListStrHeader))


Running on Dataflow fires the below error. I don't have any idea where to look for the issue. The error is not pointing to my pipeline code but to apache beam modules. 
I will try debugging using elimination. Please let me know if you have any direction for me.

Many thanks,
Eila


======================================================
DataflowRuntimeExceptionTraceback (most recent call last)
<ipython-input-151-1e5aeb8b7d9b> in <module>()
----> 1 p.run().wait_until_finish()

/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.pyc in wait_until_finish(self, duration)
    776         raise DataflowRuntimeException(
    777             'Dataflow pipeline failed. State: %s, Error:\n%s' %
--> 778             (self.state, getattr(self._runner, 'last_error_msg', None)), self)
    779     return self.state
    780 

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 581, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute
    op.start()
  File "dataflow_worker/operations.py", line 283, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10680)
    def start(self):
  File "dataflow_worker/operations.py", line 284, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:10574)
    with self.scoped_start_state:
  File "dataflow_worker/operations.py", line 289, in dataflow_worker.operations.DoOperation.start (dataflow_worker/operations.c:9775)
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named indexes.base
======================================================

--