python - 使用气流将文件流式传输到kafka

标签 python apache-airflow

使用气流将 CSV 文件流式传输到 kafka 主题的最佳方法是什么?

为气流编写自定义运算符?

最佳答案

可能最好使用 PythonOperator 逐行处理文件。我有一个用例,我轮询和 SFTP 服务器获取文件,当我找到一些文件时,我逐行处理它们,将结果写为 JSON。我会做一些事情,比如将日期解析为 YYYY-MM-DD 格式等。这样的事情可能对你有用:

def csv_file_to_kafka(**context):

    f = '/path/to/downloaded/csv_file.csv'
    csvfile = open(f, 'r')
    reader = csv.DictReader(csvfile)

    for row in reader:
        """
        Send the row to Kafka
        """
    return 

csv_file_to_kafka = PythonOperator(
   task_id='csv_file_to_kafka',
   python_callable=csv_file_to_kafka,
   dag=dag
)

现在如何下载文件完全取决于您。在我的例子中,我使用 SSHHookGoogleCloudStorageHook 从 SFTP 服务器获取文件,然后将文件名传递给解析和清理 csv 文件的任务。我通过从 SFTP 中提取文件并将它们放入 Google Cloud Storage 来执行此操作:

"""
HOOKS: Connections to external systems
"""
def sftp_connection():
    """
    Returns an SFTP connection created using the SSHHook
    """
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
    ssh_client = ssh_hook.get_conn()
    return ssh_client.open_sftp()
def gcs_connection():
    """
    Returns an GCP connection created using the GoogleCloudStorageHook
    """
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')

"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
    """
    Looks at all files on the FTP server and returns a list files.
    """
    sftp_client = sftp_connection()
    all_files = sftp_client.listdir('/path/to/files/')
    files = []

    for f in all_files:
        files.append(f)

    return files

def save_files(**context):
    """
    Looks to see if a file already exists in GCS. If not, the file is downloaed
    from SFTP server and uploaded to GCS. A list of
    """
    files = context['task_instance'].xcom_pull(task_ids='get_files')

    sftp_client = sftp_connection()
    gcs = gcs_connection()
    new_files = []
    new_outcomes_files = []
    new_si_files = []

    new_files = process_sftp_files(files, gcs, sftp_client)

    return new_files

def csv_file_to_kafka(**context):
    """
    Untested sample parse csv files and send to kafka
    """
    files = context['task_instance'].xcom_pull(task_ids='save_files')
    for f in new_files:
        csvfile = open(f, 'r')
        reader = csv.DictReader(csvfile)

        for row in reader:
            """
            Send the row to Kafka
            """
    return 

get_files = PythonOperator(
   task_id='get_files',
   python_callable=get_files,
   dag=dag
)
save_files = PythonOperator(
   task_id='save_files',
   python_callable=save_files,
   dag=dag
)
csv_file_to_kafka = PythonOperator(
   task_id='csv_file_to_kafka',
   python_callable=csv_file_to_kafka,
   dag=dag
)

我知道我可以在一个大的 python 可调用文件中完成这一切,这就是我现在重构代码以便在可调用文件中的方式。所以它轮询 SFTP 服务器,提取最新文件,并根据我的规则在一个 python 函数中解析它们。我听说使用 XCom 并不理想,Airflow 任务不应该相互通信太多,据说。

根据您的用例,您甚至可能想要探索类似 Apache Nifi 的内容,实际上我现在也在研究这个问题。

关于python - 使用气流将文件流式传输到kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46778171/

相关文章:

airflow - 我如何在 Airflow 中使用 --conf 选项

user-interface - 在虚拟机上运行的Apache Airflow Web服务器生成的View UI界面

python - 从 Pandas Dataframe 中获取某些元素

python - 如何计算 Python 中非高斯分布中值的中值和 68% 置信区间?

python - 在 Mac (Catalina) 上安装 PyGame 时出错

python - docker-compose exec 导致 [Errno 2] 没有这样的文件或目录 : 'docker-compose' : 'docker-compose' in docker container

python - mysql 如何格式化日期时间?

python - Apache Airflow DAG 无法导入本地模块

airflow - 在airflow中通过admin上传变量需要什么文件格式?

airflow - 如何在使用 Airflow 实现的工作流中等待 DAG 任务中的异步事件?