git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Problem with SparkSubmit


Hi all,
I have problem with spark operator. I get exception

user@host:/# airflow test myDAG myTask 2018-04-26
[2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to: sparkhost
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 528, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1584, in run
    session=session)
  File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 145, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 231, in submit
    **kwargs)
  File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
    restore_signals, start_new_session)
  File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
    restore_signals, start_new_session, preexec_fn)
TypeError: Can't convert 'list' object to str implicitly

My DAG look like:

from airflow import DAG
from datetime import datetime, timedelta, date
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

default_args = {
    'owner': 'spark',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG('myDAG', default_args=default_args,)

connection_id = "SPARK"
os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077'

_config = {
    'jars': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,
    'java_class':'org.Job'
}

operator = SparkSubmitOperator(
    task_id='myTask',
    dag=dag,
    **_config
)

What is wrong? Could somebody help me wit it?



( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03210.html on line 135
Call Stack
#TimeMemoryFunctionLocation
10.0006368744{main}( ).../msg03210.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03210.html on line 135
Call Stack
#TimeMemoryFunctionLocation
10.0006368744{main}( ).../msg03210.html:0