mysql - apache airflow initdb 在 mysql 的 kubernetes_resource_checkingpoint 失败

标签 mysql airflow mysql-python

我想使用MySQL作为apache airflow的后端数据库 在我运行时安装依赖项之后

airflow initdb

Airflow 开始设置数据库,但随后失败并显示以下日志

shahbaz@OpenSource:~$ airflow initdb
[2019-07-11 12:01:13,726] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, 
pool_recycle=1800, pid=17492
[2019-07-11 12:01:13,917] {__init__.py:51} INFO - Using executor 
LocalExecutor
DB: mysql+mysqldb://airflow:***@localhost:3306/airflow
[2019-07-11 12:01:14,276] {db.py:350} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl MySQLImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, 
current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 
1507a7289a2f, create is_encrypted
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 
13eb55f81627, maintain history for compatibility with earlier 
migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 
338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 
52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 
502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 
1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 
2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 
40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 
561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 
4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> 
bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> 
bba5a7cfc896, Add a column to track the encryption state of the 
'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 
1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 
2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 
211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 
64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> 
f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 
4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 
8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 
5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 
127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> 
cc1e65623dc7, add max tries column to task instance
INFO  [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> 
bdaa763e6c56, Make xcom value column a large binary
INFO  [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 
947454bf1dff, add ti job_id index
INFO  [alembic.runtime.migration] Running upgrade 947454bf1dff -> 
d2ae31099d61, Increase text size for MySQL (not relevant for other 
DBs' text types)
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
0e2a74e0fc9f, Add time zone awareness
INFO  [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 
33ae817a1ff4, kubernetes_resource_checkpointing
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist- 
packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
cursor, statement, parameters, context


File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/shahbaz/.local/bin/airflow", line 32, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 1096, in initdb
    db.initdb(settings.RBAC)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 91, in initdb
    upgradedb()
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 358, in upgradedb
    command.upgrade(config, 'heads')
  File "/usr/local/lib/python3.6/dist-packages/alembic/command.py", line 254, in upgrade
    script.run_env()
  File "/usr/local/lib/python3.6/dist-packages/alembic/script/base.py", line 427, in run_env
    util.load_python_file(self.dir, 'env.py')
  File "/usr/local/lib/python3.6/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
    module = load_module_py(module_id, path)
  File "/usr/local/lib/python3.6/dist-packages/alembic/util/compat.py", line 83, in load_module_py
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 92, in <module>
    run_migrations_online()
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/env.py", line 86, in run_migrations_online
    context.run_migrations()
  File "<string>", line 8, in run_migrations
  File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/environment.py", line 836, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/usr/local/lib/python3.6/dist-packages/alembic/runtime/migration.py", line 330, in run_migrations
    step.migration_fn(**kw)
  File "/usr/local/lib/python3.6/dist-packages/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py", line 55, in upgrade
    *columns_and_constraints
  File "<string>", line 8, in create_table
  File "<string>", line 3, in create_table
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/ops.py", line 1120, in create_table
    return operations.invoke(op)
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/base.py", line 319, in invoke
    return fn(self, operation)
  File "/usr/local/lib/python3.6/dist-packages/alembic/operations/toimpl.py", line 101, in create_table
    operations.impl.create_table(table)
  File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 194, in create_table
    self._exec(schema.CreateTable(table))
  File "/usr/local/lib/python3.6/dist-packages/alembic/ddl/impl.py", line 118, in _exec
    return conn.execute(construct, *multiparams, **params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 980, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
    return connection._execute_ddl(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1042, in _execute_ddl
    compiled,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1240, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1458, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 296, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 276, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1236, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 536, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 255, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 252, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/cursors.py", line 378, in _query
    db.query(q)
  File "/usr/local/lib/python3.6/dist-packages/MySQLdb/connections.py", line 280, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (3812, "An expression of non-boolean type specified to a check constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, \n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), \n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), \n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error at: http://sqlalche.me/e/e3q8)

可以看到 initdb 命令对 kubernetes_resource_checkpointing 失败

最后一条日志表明这是因为 sqlalchemy 中的 OperationalError。

sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) 
(3812, "An expression of non-boolean type specified to a check 
constraint 'kube_resource_version_one_row_id'.") [SQL: '\nCREATE TABLE 
kube_resource_version (\n\tone_row_id BOOL NOT NULL DEFAULT true, 
\n\tresource_version VARCHAR(255), \n\tPRIMARY KEY (one_row_id), 
\n\tCONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), 
\n\tCHECK (one_row_id IN (0, 1))\n)\n\n'] (Background on this error 
at: http://sqlalche.me/e/e3q8)

我想在这里声明,我能够使用 Postgres 数据库运行 apache-airflow,我将 airflow 与 Postgres 一起使用只是因为它对 MySQL 来说很奇怪。

我正在使用

apache-airflow 版本 1.10.3

mysql 版本 8.0.16(MySQL 社区服务器 - GPL)

此外,我还尝试按照 airflow 文档的说明,使用“ANSI”为 MYSQL 设置 SQL_MODE,但这一切都是徒劳的。

任何帮助将不胜感激

[编辑]

感谢“skadya”指出问题链接 让我分享一下我的发现 我查看了'Shi Chen'指出的代码文件 两个文件负责此行为。

33ae817a1ff4_add_kubernetes_resource_checkpointing.py
86770d1215c0_add_kubernetes_scheduler_uniqueness.py

这两个文件都是使用 alembic 和 sqlalchemy 库的迁移文件 我发现下面的 sqlalchemy 代码写在文件 33ae817a1ff4_add_kubernetes_resource_checkpointing.py

def upgrade():

    columns_and_constraints = [
        sa.Column("one_row_id", sa.Boolean, server_default=sa.true(), primary_key=True),
        sa.Column("resource_version", sa.String(255))
    ]

    conn = op.get_bind()

    # alembic creates an invalid SQL for mssql dialect
    if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id"))

    table = op.create_table(
        RESOURCE_TABLE,
        *columns_and_constraints
    )

    op.bulk_insert(table, [
        {"resource_version": ""}
    ])

被解释为以下不正确的 SQL 查询

CREATE TABLE 
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true, 
resource_version VARCHAR(255), PRIMARY KEY (one_row_id), 
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id), 
CHECK (one_row_id IN (0, 1))

相反,SQL 查询应该是这样的

CREATE TABLE 
kube_resource_version (one_row_id BOOL NOT NULL DEFAULT true, 
resource_version VARCHAR(255), PRIMARY KEY (one_row_id), 
CONSTRAINT kube_resource_version_one_row_id CHECK (one_row_id IN (0, 
1)))

“skadya”提供的链接很有帮助,我在对上述两个文件的代码进行更改后让系统正常工作。

您只需更改以下代码

if conn.dialect.name not in ('mssql'):
        columns_and_constraints.append(
            sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
        )

if conn.dialect.name not in ('mssql', 'mysql'):
    columns_and_constraints.append(
        sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id")
    )

最佳答案

airflow 错误跟踪器中有一个 Unresolved 错误。

https://issues.apache.org/jira/browse/AIRFLOW-4995 .

作为解决方法,您可以应用 pull request 中建议的更改手动。

更新:此错误已在 airflow 版本:1.10.4 中修复

关于mysql - apache airflow initdb 在 mysql 的 kubernetes_resource_checkingpoint 失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56988054/

相关文章:

mysql - 如何在 Azure Web 应用安装存储中指定带点的安装路径

mysql - 一个不同的问题

mysql - 部分/缩写名称匹配

mysql - 错误 "TypeError: not all arguments converted during string formatting"MySQLdb

mysql - 按主键顺序插入行更快吗?

python - Airflow 中 Python Operator 中 execution_timeout 的默认值

kubernetes - 如何在 Kubeflow 中安排作业?

airflow - 如何在 Airflow 中动态创建的任务之间创建依赖关系

python - MySQLdb 没有用数据填充数据库中的行,但是 feildes 是自动递增的

mysql - 测试与 MySQL 服务器的连接