[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Problem with SparkSubmit

Hi all,
I have problem with spark operator. I get exception

user@host:/# airflow test myDAG myTask 2018-04-26
[2018-04-26 15:32:11,279] {} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-04-26 15:32:11,323] {} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[2018-04-26 15:32:11,456] {} INFO - Using executor SequentialExecutor
[2018-04-26 15:32:11,535] {} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2018-04-26 15:32:11,811] {} INFO - Using connection to: sparkhost
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
  File "/usr/local/lib/python3.5/dist-packages/airflow/bin/", line 528, in test, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python3.5/dist-packages/airflow/utils/", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/airflow/", line 1584, in run
  File "/usr/local/lib/python3.5/dist-packages/airflow/utils/", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/airflow/", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/", line 145, in execute
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/", line 231, in submit
  File "/usr/lib/python3.5/", line 947, in __init__
    restore_signals, start_new_session)
  File "/usr/lib/python3.5/", line 1490, in _execute_child
    restore_signals, start_new_session, preexec_fn)
TypeError: Can't convert 'list' object to str implicitly

My DAG look like:

from airflow import DAG
from datetime import datetime, timedelta, date
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

default_args = {
    'owner': 'spark',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)

dag = DAG('myDAG', default_args=default_args,)

connection_id = "SPARK"
os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077'

_config = {
    'jars': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,

operator = SparkSubmitOperator(

What is wrong? Could somebody help me wit it?