git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Tensorflow Hub - cant seem to load module in Airflow


sure thing.

attached is a minimal example. 

error i get is:

[2018-06-19 09:44:23,133] {cli.py:374} INFO - Running on host airflow-worker-f796f6bd-7qzwc
[2018-06-19 09:44:23,577] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dev_dag.encode_posts 2018-06-19 09:15:00 [queued]>
[2018-06-19 09:44:23,813] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dev_dag.encode_posts 2018-06-19 09:15:00 [queued]>
[2018-06-19 09:44:23,824] {models.py:1406} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 6
--------------------------------------------------------------------------------

[2018-06-19 09:44:24,188] {models.py:1427} INFO - Executing <Task(PythonOperator): encode_posts> on 2018-06-19 09:15:00
[2018-06-19 09:44:24,222] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run dev_dag encode_posts 2018-06-19T09:15:00 --job_id 5753 --raw -sd DAGS_FOLDER/dev_dag.py']
[2018-06-19 09:45:11,207] {base_task_runner.py:98} INFO - Subtask: [2018-06-19 09:45:11,164] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-06-19 09:45:13,200] {base_task_runner.py:98} INFO - Subtask: [2018-06-19 09:45:13,168] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dev_dag.py
[2018-06-19 09:45:13,222] {base_task_runner.py:98} INFO - Subtask: /usr/local/lib/python2.7/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing DummyOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/site-packages/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/site-packages/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
[2018-06-19 09:45:13,224] {base_task_runner.py:98} INFO - Subtask:   DeprecationWarning)
[2018-06-19 09:45:27,573] {base_task_runner.py:98} INFO - Subtask: [2018-06-19 09:45:27,571] {dev_dag.py:52} INFO - ... begin - get module from tf-hub ...
[2018-06-19 09:45:28,228] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-06-19 09:45:28,230] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-06-19 09:45:28,232] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-06-19 09:45:28,232] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-06-19 09:45:28,233] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-06-19 09:45:28,233] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-06-19 09:45:28,234] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-06-19 09:45:28,234] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-06-19 09:45:28,250] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-06-19 09:45:28,250] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-06-19 09:45:28,251] {base_task_runner.py:98} INFO - Subtask:     return_value = self.execute_callable()
[2018-06-19 09:45:28,251] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-06-19 09:45:28,252] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-06-19 09:45:28,252] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/gcs/dags/dev_dag.py", line 53, in fn_encode_posts
[2018-06-19 09:45:28,253] {base_task_runner.py:98} INFO - Subtask:     embed = hub.Module("https://tfhub.dev/google/nnlm-en-dim50/1")
[2018-06-19 09:45:28,253] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/module.py", line 105, in __init__
[2018-06-19 09:45:28,276] {base_task_runner.py:98} INFO - Subtask:     self._spec = as_module_spec(spec)
[2018-06-19 09:45:28,277] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/module.py", line 31, in as_module_spec
[2018-06-19 09:45:28,278] {base_task_runner.py:98} INFO - Subtask:     return native_module.load_module_spec(spec)
[2018-06-19 09:45:28,278] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/native_module.py", line 99, in load_module_spec
[2018-06-19 09:45:28,280] {base_task_runner.py:98} INFO - Subtask:     path = compressed_module_resolver.get_default().get_module_path(path)
[2018-06-19 09:45:28,280] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 385, in get_module_path
[2018-06-19 09:45:28,295] {base_task_runner.py:98} INFO - Subtask:     return self._get_module_path(handle)
[2018-06-19 09:45:28,296] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 467, in _get_module_path
[2018-06-19 09:45:28,297] {base_task_runner.py:98} INFO - Subtask:     return resolver.get_module_path(handle)
[2018-06-19 09:45:28,297] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 385, in get_module_path
[2018-06-19 09:45:28,298] {base_task_runner.py:98} INFO - Subtask:     return self._get_module_path(handle)
[2018-06-19 09:45:28,299] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/compressed_module_resolver.py", line 105, in _get_module_path
[2018-06-19 09:45:28,342] {base_task_runner.py:98} INFO - Subtask:     self._lock_file_timeout_sec())
[2018-06-19 09:45:28,343] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 313, in atomic_download
[2018-06-19 09:45:28,343] {base_task_runner.py:98} INFO - Subtask:     download_fn(handle, tmp_dir)
[2018-06-19 09:45:28,344] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/compressed_module_resolver.py", line 86, in download
[2018-06-19 09:45:28,345] {base_task_runner.py:98} INFO - Subtask:     request = url.Request(_append_compressed_format_query(handle))
[2018-06-19 09:45:28,345] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/compressed_module_resolver.py", line 62, in _append_compressed_format_query
[2018-06-19 09:45:28,346] {base_task_runner.py:98} INFO - Subtask:     return urlparse.urlunparse(parsed)
[2018-06-19 09:45:28,346] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/future/backports/urllib/parse.py", line 387, in urlunparse
[2018-06-19 09:45:28,368] {base_task_runner.py:98} INFO - Subtask:     _coerce_args(*components))
[2018-06-19 09:45:28,370] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/future/backports/urllib/parse.py", line 115, in _coerce_args
[2018-06-19 09:45:28,371] {base_task_runner.py:98} INFO - Subtask:     raise TypeError("Cannot mix str and non-str arguments")
[2018-06-19 09:45:28,373] {base_task_runner.py:98} INFO - Subtask: TypeError: Cannot mix str and non-str arguments

I'm running this on Google cloud composer which is airflow 1.9 i believe. 

Cheers,
Andy

On Tue, Jun 19, 2018 at 10:06 AM Ash Berlin-Taylor <ash_airflowlist@xxxxxxxxxxxxxx> wrote:
There's nothing directly in Airflow itself that would cause this kind of issue that I can think of.

It depends on what the PythonOperator you are using in the DAG does really. Can you share that code?

-ash

> On 19 Jun 2018, at 10:01, Andrew Maguire <andrewm4894@xxxxxxxxx> wrote:
>
> Hi All,
>
> Just wondering if anyone might have a deeper insight into what if anything
> airflow related might be causing this issue
> <https://github.com/tensorflow/hub/issues/76>.
>
> When i try load a tensorflow hub module within an airflow operator i get
> the error in that issue.
>
> Works fine if i just run the python script myself.
>
> Best i could figure out was something airflow was doing didn't agree with
> something tensorflow hub was expecting. And i'm not really sure if there is
> anything i could do to resolve.
>
> Cheers,
> Andy

# -*- coding: utf-8 -*-

"""
tf_hub_encode_posts
"""

#########################################################
# SET UP
#########################################################

from datetime import datetime, timedelta
from airflow import DAG, macros
from airflow.operators import DummyOperator
from airflow.operators.python_operator import PythonOperator
import tensorflow as tf
import tensorflow_hub as hub
import pandas as pd 
import logging


#########################################################
# DEFINE DAG
#########################################################

# define default args with helper function
default_args = dict(
      owner = 'Airflow',
      depends_on_past = False,
      retries = 5,
      retry_delay = timedelta(minutes=2),
      catchup = False
    )

# Define the DAG
dag = DAG(dag_id='dev_dag',
          description='dev_dag',
          start_date=datetime.strptime('2018-06-18', '%Y-%m-%d'),
          #schedule_interval='*/15 * * * *',
          schedule_interval='@once',
          default_args=default_args,
          catchup = False
          )

#########################################################
# PYTHON DAG FUNCTIONS
#########################################################

def fn_encode_posts(**kwargs):

    tf.logging.set_verbosity(tf.logging.ERROR)

    # Import the Universal Sentence Encoder's TF Hub module
    logging.info('... begin - get module from tf-hub ...')
    embed = hub.Module("https://tfhub.dev/google/nnlm-en-dim50/1";)
    logging.info('... done - get module from tf-hub ...')

    posts = ['a sentence to embedd', 'another sentence that should have a similar embedding']

    # run tf session to get embedding
    with tf.Session() as session:
        session.run([tf.global_variables_initializer(), tf.tables_initializer()])
        post_embeddings = session.run(embed(posts))

    # wrangle df
    logging.info('... save to df ...')
    df_out = pd.DataFrame(post_embeddings,columns=['dim_{}'.format(i) for i in range(1,post_embeddings.shape[1]+1)])

    logging.info('... look at df ...')
    logging.info(df_out.info())
    logging.info(df_out.head().T)


#########################################################
# TASKS
#########################################################


# Set DummyOperator
kick_off_dag = DummyOperator(task_id='kick_off_dag',dag=dag)
    
#----------------------------------#
# encode_posts
#----------------------------------#
    
encode_posts = PythonOperator(
    task_id='encode_posts',
    provide_context=True,
    python_callable=fn_encode_posts,
    dag = dag,
    priority_weight = 100
    )

kick_off_dag >> encode_posts


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03652.html on line 242
Call Stack
#TimeMemoryFunctionLocation
10.0009376984{main}( ).../msg03652.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03652.html on line 242
Call Stack
#TimeMemoryFunctionLocation
10.0009376984{main}( ).../msg03652.html:0