rabbitmq - Airflow Scheduler 和 Web 服务器在对 RabbitMQ 上运行的任务进行排队时挂起

标签 rabbitmq celery airflow

我正在努力让 Airflow 工作人员运行任务。我开始服务:

airflow worker --debug
airflow webserver
airflow scheduler
airflow flower #to check celery queues in UI at localhost:5555

这些进程运行良好,但是当调度程序将要运行的任务添加到队列中或者当我尝试从 Airflow UI 运行任务时,调度程序和网络服务器被挂起 - 连续加载不再继续 - 同时添加任务到队列: Scheduler hanging Airflow UI Webserver hanging Workers are not receiving tasks from the rabbitmq queue Flower server is hanging

我认为问题与调度程序/网络服务器和队列之间的通信有关。我在airflow.cfg 文件中与代理相关的设置是:broker_url = amqp://guest:***@ksaprice_rabbitmq:15672// - 我也尝试过:broker_url = pyamqp://guest:***@ksaprice_rabbitmq:15672// 。 rabbitmq 服务器运行良好,我也测试了登录名和密码凭据。

我使用的版本是:

  • Airflow ==1.8.1
  • celery =4.1
  • rabbitmq 服务器 3.6

我是 Airflow 和 Rabbitmq 的新手。

更新: 我的排队问题已通过 @Jean-Sébastien Pédron 的回答解决,但我的工作人员仍然没有执行任务,并且花朵没有显示工作人员,尽管 airflow worker服务正在8793端口运行。

Rabbitmq 报告:

Status of node ksaprice_rabbitmq@4eed789778c0
[{pid,233},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.6.10"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"},
      {amqp_client,"RabbitMQ AMQP Client","3.6.10"},
      {cowboy,"Small, fast, modular HTTP server.","1.0.4"},
      {cowlib,"Support library for manipulating Web protocols.","1.0.2"},
      {inets,"INETS  CXC 138 49","6.3.4"},
      {rabbit,"RabbitMQ","3.6.10"},
      {mnesia,"MNESIA  CXC 138 12","4.14.2"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.6.10"},
      {compiler,"ERTS  CXC 138 10","7.0.3"},
      {os_mon,"CPO  CXC 138 46","2.4.1"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
      {ssl,"Erlang/OTP SSL application","8.1"},
      {public_key,"Public key infrastructure","1.3"},
      {crypto,"CRYPTO","3.7.2"},
      {xmerl,"XML parser","1.3.12"},
      {asn1,"The Erlang ASN1 compiler version 4.0.4","4.0.4"},
      {syntax_tools,"Syntax tools","2.1.1"},
      {sasl,"SASL  CXC 138 11","3.0.2"},
      {stdlib,"ERTS  CXC 138 10","3.2"},
      {kernel,"ERTS  CXC 138 10","5.1.1"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang/OTP 19 [erts-8.2.1] [source] [64-bit] [smp:2:2] [async-threads:64] [hipe] [kernel-poll:true]\n"},
 {memory,
     [{total,77018832},
      {connection_readers,334888},
      {connection_writers,14640},
      {connection_channels,132040},
      {connection_other,477152},
      {queue_procs,65480},
      {queue_slave_procs,0},
      {plugins,2287080},
      {other_proc,19854000},
      {mnesia,77272},
      {metrics,239992},
      {mgmt_db,852688},
      {msg_index,44208},
      {other_ets,2577600},
      {binary,3923976},
      {code,24680786},
      {atom,1033401},
      {other_system,20660789}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,830581964},
 {disk_free_limit,50000000},
 {disk_free,56083853312},
 {file_descriptors,
     [{total_limit,1048476},
      {total_used,13},
      {sockets_limit,943626},
      {sockets_used,10}]},
 {processes,[{limit,1048576},{used,420}]},
 {run_queue,0},
 {uptime,45431},
 {kernel,{net_ticktime,60}}]

Cluster status of node ksaprice_rabbitmq@4eed789778c0
[{nodes,[{disc,[ksaprice_rabbitmq@4eed789778c0]}]},
 {running_nodes,[ksaprice_rabbitmq@4eed789778c0]},
 {cluster_name,<<"ksaprice_rabbitmq@4eed789778c0">>},
 {partitions,[]},
 {alarms,[{ksaprice_rabbitmq@4eed789778c0,[]}]}]

Application environment of node ksaprice_rabbitmq@4eed789778c0
[{amqp_client,[{prefer_ipv6,false},{ssl_options,[]}]},
 {asn1,[]},
 {compiler,[]},
 {cowboy,[]},
 {cowlib,[]},
 {crypto,[]},
 {inets,[]},
 {kernel,
     [{error_logger,tty},
      {inet_default_connect_options,[{nodelay,true}]},
      {inet_dist_listen_max,25672},
      {inet_dist_listen_min,25672}]},
 {mnesia,[{dir,"/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq"}]},
 {os_mon,
     [{start_cpu_sup,false},
      {start_disksup,false},
      {start_memsup,false},
      {start_os_sup,false}]},
 {public_key,[]},
 {rabbit,
     [{auth_backends,[rabbit_auth_backend_internal]},
      {auth_mechanisms,['PLAIN','AMQPLAIN']},
      {background_gc_enabled,false},
      {background_gc_target_interval,60000},
      {backing_queue_module,rabbit_priority_queue},
      {channel_max,0},
      {channel_operation_timeout,15000},
      {cluster_keepalive_interval,10000},
      {cluster_nodes,{[],disc}},
      {cluster_partition_handling,ignore},
      {collect_statistics,fine},
      {collect_statistics_interval,5000},
      {config_entry_decoder,
          [{cipher,aes_cbc256},
           {hash,sha512},
           {iterations,1000},
           {passphrase,undefined}]},
      {credit_flow_default_credit,{400,200}},
      {default_permissions,[<<".*">>,<<".*">>,<<".*">>]},
      {default_user,<<"guest">>},
      {default_user_tags,[administrator]},
      {default_vhost,<<"/">>},
      {delegate_count,16},
      {disk_free_limit,50000000},
      {disk_monitor_failure_retries,10},
      {disk_monitor_failure_retry_interval,120000},
      {enabled_plugins_file,"/etc/rabbitmq/enabled_plugins"},
      {error_logger,tty},
      {fhc_read_buffering,false},
      {fhc_write_buffering,true},
      {frame_max,131072},
      {halt_on_upgrade_failure,true},
      {handshake_timeout,10000},
      {heartbeat,60},
      {hipe_compile,false},
      {hipe_modules,
          [rabbit_reader,rabbit_channel,gen_server2,rabbit_exchange,
           rabbit_command_assembler,rabbit_framing_amqp_0_9_1,rabbit_basic,
           rabbit_event,lists,queue,priority_queue,rabbit_router,rabbit_trace,
           rabbit_misc,rabbit_binary_parser,rabbit_exchange_type_direct,
           rabbit_guid,rabbit_net,rabbit_amqqueue_process,
           rabbit_variable_queue,rabbit_binary_generator,rabbit_writer,
           delegate,gb_sets,lqueue,sets,orddict,rabbit_amqqueue,
           rabbit_limiter,gb_trees,rabbit_queue_index,
           rabbit_exchange_decorator,gen,dict,ordsets,file_handle_cache,
           rabbit_msg_store,array,rabbit_msg_store_ets_index,rabbit_msg_file,
           rabbit_exchange_type_fanout,rabbit_exchange_type_topic,mnesia,
           mnesia_lib,rpc,mnesia_tm,qlc,sofs,proplists,credit_flow,pmon,
           ssl_connection,tls_connection,ssl_record,tls_record,gen_fsm,ssl]},
      {lazy_queue_explicit_gc_run_operation_threshold,1000},
      {log_levels,[{connection,info}]},
      {loopback_users,[]},
      {memory_monitor_interval,2500},
      {mirroring_flow_control,true},
      {mirroring_sync_batch_size,4096},
      {mnesia_table_loading_retry_limit,10},
      {mnesia_table_loading_retry_timeout,30000},
      {msg_store_credit_disc_bound,{4000,800}},
      {msg_store_file_size_limit,16777216},
      {msg_store_index_module,rabbit_msg_store_ets_index},
      {msg_store_io_batch_size,4096},
      {num_ssl_acceptors,1},
      {num_tcp_acceptors,10},
      {password_hashing_module,rabbit_password_hashing_sha256},
      {plugins_dir,
          "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/plugins"},
      {plugins_expand_dir,
          "/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq-plugins-expand"},
      {queue_explicit_gc_run_operation_threshold,1000},
      {queue_index_embed_msgs_below,4096},
      {queue_index_max_journal_entries,32768},
      {reverse_dns_lookups,false},
      {sasl_error_logger,tty},
      {server_properties,[]},
      {ssl_allow_poodle_attack,false},
      {ssl_apps,[asn1,crypto,public_key,ssl]},
      {ssl_cert_login_from,distinguished_name},
      {ssl_handshake_timeout,5000},
      {ssl_listeners,[]},
      {ssl_options,[]},
      {tcp_listen_options,
          [{backlog,128},
           {nodelay,true},
           {linger,{true,0}},
           {exit_on_close,false}]},
      {tcp_listeners,[5672]},
      {trace_vhosts,[]},
      {vm_memory_high_watermark,0.4},
      {vm_memory_high_watermark_paging_ratio,0.5}]},
 {rabbit_common,[]},
 {rabbitmq_management,
     [{cors_allow_origins,[]},
      {cors_max_age,1800},
      {http_log_dir,none},
      {listener,[{port,15672}]},
      {load_definitions,none},
      {management_db_cache_multiplier,5},
      {process_stats_gc_timeout,300000},
      {stats_event_max_backlog,250}]},
 {rabbitmq_management_agent,
     [{rates_mode,basic},
      {sample_retention_policies,
          [{global,[{605,5},{3660,60},{29400,600},{86400,1800}]},
           {basic,[{605,5},{3600,60}]},
           {detailed,[{605,5}]}]}]},
 {rabbitmq_web_dispatch,[]},
 {ranch,[]},
 {sasl,[{errlog_type,error},{sasl_error_logger,tty}]},
 {ssl,[]},
 {stdlib,[]},
 {syntax_tools,[]},
 {xmerl,[]}]

Connections:
pid     name    port    peer_port       host    peer_host       ssl     peer_cert_subject       peer_cert_issuer        peer_cert_validity      auth_mechanismssl_protocol    ssl_key_exchange        ssl_cipher      ssl_hash        protocol        user    vhost   timeout frame_max       channel_max     client_properties     connected_at    recv_oct        recv_cnt        send_oct        send_cnt        send_pend       state   channels        reductions      garbage_collection
<ksaprice_rabbitmq@4eed789778c0.1.7700.0>       172.25.0.2:47982 -> 172.25.0.4:5672     5672    47982   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749265595   1897  10      606     7       0       running 1       235055  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,170}]
<ksaprice_rabbitmq@4eed789778c0.1.7755.0>       172.25.0.2:48764 -> 172.25.0.4:5672     5672    48764   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346461   289   5       554     4       0       running 1       226409  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]
<ksaprice_rabbitmq@4eed789778c0.1.7764.0>       172.25.0.2:48766 -> 172.25.0.4:5672     5672    48766   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346494   1647  21      1030    20      0       running 1       228859  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<ksaprice_rabbitmq@4eed789778c0.1.7767.0>       172.25.0.2:48768 -> 172.25.0.4:5672     5672    48768   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346494   569   9       662     8       0       running 1       226947  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,164}]
<ksaprice_rabbitmq@4eed789778c0.1.7770.0>       172.25.0.2:48770 -> 172.25.0.4:5672     5672    48770   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346495   1647  20      1030    20      0       running 1       228798  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,167}]
<ksaprice_rabbitmq@4eed789778c0.1.7787.0>       172.25.0.2:48772 -> 172.25.0.4:5672     5672    48772   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346511   85953 485     1042    21      0       running 1       280680  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,110}]
<ksaprice_rabbitmq@4eed789778c0.1.7806.0>       172.25.0.2:48774 -> 172.25.0.4:5672     5672    48774   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346548   665   7       566     5       0       running 1       226665  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,168}]
<ksaprice_rabbitmq@4eed789778c0.1.7815.0>       172.25.0.2:48776 -> 172.25.0.4:5672     5672    48776   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346551   1647  21      1030    20      0       running 1       228859  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<ksaprice_rabbitmq@4eed789778c0.1.7839.0>       172.25.0.2:48780 -> 172.25.0.4:5672     5672    48780   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346576   1691  9       566     5       0       running 1       226936  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,169}]
<ksaprice_rabbitmq@4eed789778c0.1.7842.0>       172.25.0.2:48778 -> 172.25.0.4:5672     5672    48778   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346576   1496  9       566     5       0       running 1       226885  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]

Channels:
pid     name    connection      number  user    vhost   reductions      transactional   confirm consumer_count  messages_unacknowledged messages_unconfirmed  messages_uncommitted    acks_uncommitted        prefetch_count  global_prefetch_count   state   garbage_collection
<ksaprice_rabbitmq@4eed789778c0.1.7706.0>       172.25.0.2:47982 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7700.0>       1       admin   ksaprice_rabbitmq_vh  4140    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<ksaprice_rabbitmq@4eed789778c0.1.7761.0>       172.25.0.2:48764 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7755.0>       1       admin   ksaprice_rabbitmq_vh  1706    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<ksaprice_rabbitmq@4eed789778c0.1.7776.0>       172.25.0.2:48768 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7767.0>       1       admin   ksaprice_rabbitmq_vh  4737    false   false   1       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]
<ksaprice_rabbitmq@4eed789778c0.1.7788.0>       172.25.0.2:48770 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7770.0>       1       admin   ksaprice_rabbitmq_vh  8608    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<ksaprice_rabbitmq@4eed789778c0.1.7793.0>       172.25.0.2:48766 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7764.0>       1       admin   ksaprice_rabbitmq_vh  7977    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<ksaprice_rabbitmq@4eed789778c0.1.7812.0>       172.25.0.2:48772 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7787.0>       1       admin   ksaprice_rabbitmq_vh  116017  false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,1}]
<ksaprice_rabbitmq@4eed789778c0.1.7827.0>       172.25.0.2:48776 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7815.0>       1       admin   ksaprice_rabbitmq_vh  7977    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<ksaprice_rabbitmq@4eed789778c0.1.7835.0>       172.25.0.2:48774 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7806.0>       1       admin   ksaprice_rabbitmq_vh  3048    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<ksaprice_rabbitmq@4eed789778c0.1.7854.0>       172.25.0.2:48778 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7842.0>       1       admin   ksaprice_rabbitmq_vh  2854    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<ksaprice_rabbitmq@4eed789778c0.1.7855.0>       172.25.0.2:48780 -> 172.25.0.4:5672 (1) <ksaprice_rabbitmq@4eed789778c0.1.7839.0>       1       admin   ksaprice_rabbitmq_vh  3245    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]

Queues on ksaprice_rabbitmq_vh:
pid     name    durable auto_delete     arguments       owner_pid       exclusive       messages_ready  messages_unacknowledged messages        reductions    policy  exclusive_consumer_pid  exclusive_consumer_tag  consumers       consumer_utilisation    memory  slave_pids      synchronised_slave_pids recoverable_slaves    state   garbage_collection      messages_ram    messages_ready_ram      messages_unacknowledged_ram     messages_persistent     message_bytes   message_bytes_ready   message_bytes_unacknowledged    message_bytes_ram       message_bytes_persistent        head_message_timestamp  disk_reads      disk_writes   backing_queue_status    messages_paged_out      message_bytes_paged_out
<ksaprice_rabbitmq@4eed789778c0.1.7670.0>       default true    false   []              false   6       0       6       88075                           0             89344                           running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]  6       6       0       6       1231    1231    0       1231    1231            0       6       [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,6}, {len,6}, {target_ram_count,infinity}, {next_seq_id,6}, {avg_ingress_rate,9.303060867567184e-92}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]    0       0
<ksaprice_rabbitmq@4eed789778c0.1.7861.0>       celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   false   true    [{"x-message-ttl",5000},{"x-expires",60000}]          false   0       0       0       4739                            1       1.0     22160                           running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]  0       0       0       0       0       0       0       0       0             0       0       [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {target_ram_count,infinity}, {next_seq_id,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]        0       0

Queues on ksaprice_rabbitmq:

Queues on /:

Exchanges on ksaprice_rabbitmq_vh:
name    type    durable auto_delete     internal        arguments       policy
        direct  true    false   false   []
amq.direct      direct  true    false   false   []
amq.fanout      fanout  true    false   false   []
amq.headers     headers true    false   false   []
amq.match       headers true    false   false   []
amq.rabbitmq.trace      topic   true    false   true    []
amq.topic       topic   true    false   false   []
celery.pidbox   fanout  false   false   false   []
celeryev        topic   true    false   false   []
default direct  true    false   false   []
reply.celery.pidbox     direct  false   false   false   []

Exchanges on ksaprice_rabbitmq:
name    type    durable auto_delete     internal        arguments       policy
        direct  true    false   false   []
amq.direct      direct  true    false   false   []
amq.fanout      fanout  true    false   false   []
amq.headers     headers true    false   false   []
amq.match       headers true    false   false   []
amq.rabbitmq.trace      topic   true    false   true    []
amq.topic       topic   true    false   false   []

Exchanges on /:
name    type    durable auto_delete     internal        arguments       policy
        direct  true    false   false   []
amq.direct      direct  true    false   false   []
amq.fanout      fanout  true    false   false   []
amq.headers     headers true    false   false   []
amq.match       headers true    false   false   []
amq.rabbitmq.log        topic   true    false   true    []
amq.rabbitmq.trace      topic   true    false   true    []
amq.topic       topic   true    false   false   []

Bindings on ksaprice_rabbitmq_vh:
source_name     source_kind     destination_name        destination_kind        routing_key     arguments       vhost
        exchange        celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   queue   celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   []      ksaprice_rabbitmq_vh
        exchange        default queue   default []      ksaprice_rabbitmq_vh
celeryev        exchange        celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   queue   #       []      ksaprice_rabbitmq_vh
default exchange        default queue   default []      ksaprice_rabbitmq_vh

Bindings on ksaprice_rabbitmq:

Bindings on /:

Consumers on ksaprice_rabbitmq_vh:
queue_name      channel_pid     consumer_tag    ack_required    prefetch_count  arguments
celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   <ksaprice_rabbitmq@4eed789778c0.1.7776.0>       None4   false   0       []

Consumers on ksaprice_rabbitmq:

Consumers on /:

Permissions on ksaprice_rabbitmq_vh:
user    configure       write   read
admin   .*      .*      .*

Permissions on ksaprice_rabbitmq:

Permissions on /:
user    configure       write   read
guest   .*      .*      .*

Policies on ksaprice_rabbitmq_vh:

Policies on ksaprice_rabbitmq:

Policies on /:

Parameters on ksaprice_rabbitmq_vh:

Parameters on ksaprice_rabbitmq:

Parameters on /:

最佳答案

关于 Airflow 悬挂

我不了解 Airflow,但我相信您用于定位 RabbitMQ 的 URL 不正确:

broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//

RabbitMQ 使用TCP 端口 15672 作为其管理 Web UI,因此它是一个监听该端口的 HTTP 服务器。

AMQP 端口为 5672(标准端口)。所以我会尝试使用以下 URL:

broker_url = amqp://guest:***@ksaprice_rabbitmq//

即没有端口,因为客户端应该默认为标准端口。

关于工作人员未执行的任务

rabbitmqctl报告输出中,我们可以看到:

  • 有一个名为 default 的队列,其中有 6 条消息正在等待,但没有消费者订阅它。
  • 有一个名为 celeryev.b957bbf3-8b97-4633-897f-a887b49e617b 的队列,没有消息,并且有一个消费者在等待消息。

因此,任何一方(调度程序或工作人员)必定存在一些不正确或丢失的配置,因为它们当前没有相互通信:任务到达一个队列,但工作人员正在监视另一个队列。

关于rabbitmq - Airflow Scheduler 和 Web 服务器在对 RabbitMQ 上运行的任务进行排队时挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45478431/

相关文章:

python - celery 中重复的任务

python - AirFlow 调度程序 - 运行日期

c# - MassTransit RabbitMq 发送消息

通过 Web 控制台设置时,Rabbitmq x-overflow 拒绝发布不起作用

rabbitmq - RabbitMQ 中的过期时间

docker - Celery beat + redis with password抛出No Auth异常

celery - 克隆 celery 链

java - rabbitmq中的路由键区分大小写吗?

python - Airflow 网络服务器不启动,除非在 Debug模式下

airflow - 由于无法读取日志文件,任务失败