Python:inotify、concurrent.futures - 如何添加现有文件

标签 python inotify concurrent.futures

我有这个简单的脚本,可以使用 inotify 处理文件模块和mulit-threading :

import concurrent.futures

import inotify.adapters

def main():
    i = inotify.adapters.Inotify()

    i.add_watch(b'/data')

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        try:
            for event in i.event_gen():
                if event is not None:
                    (header, type_names, watch_path, filename) = event
                    # inotify event: IN_CLOSE_WRITE
                    if header.mask == 8:
                        future = executor.submit(process, filename.decode('utf-8'))
                        future.add_done_callback(future_callback)
        finally:
            i.remove_watch(b'/data')

if __name__ == '__main__':
    main()

我遇到的问题是,在脚本实际启动之前,监视的目录可能包含许多文件。

我想到了类似下面示例的内容,但这不会启动“产生”inotify 生成器,直到处理所有现有文件,并且它还会错过在此期间创建的新事件:

import concurrent.futures

import inotify.adapters

def main():
    i = inotify.adapters.Inotify()

    i.add_watch(b'/data')

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        files = os.listdir('/data')
        if files:
            for filename in files:
                future = executor.submit(run, filename)
                future.add_done_callback(future_callback)
        try:
            for event in i.event_gen():
                if event is not None:
                    (header, type_names, watch_path, filename) = event
                    # inotify event: IN_CLOSE_WRITE
                    if header.mask == 8:
                        future = executor.submit(process, filename.decode('utf-8'))
                        future.add_done_callback(future_callback)
        finally:
            i.remove_watch(b'/data')

if __name__ == '__main__':
    main()

有没有办法手动发送 inotify 事件或将这些文件添加到 i.event_gen() 生成器?

最佳答案

下面是一个示例,该示例在其中一个工作人员内部处理旧文件,允许在处理旧的现有文件时并行捕获新事件。根据记录,即使使用您的线性代码,我也没有遇到丢失事件的问题。

此外,PyInotify 模块“已失效且不再可用”。根据这个inotify module我用过。

#!/usr/bin/env python3

import concurrent.futures
import inotify.adapters
import time
import os
from functools import partial


DIRECTORY='.'


def run(filename, suffix=''):
    time.sleep(1)
    return 'run: ' + filename + suffix


def process(filename):
    return run(filename, suffix=' (inotify)')


def future_callback(fut):
    print('future_callback: ' + fut.result())


def do_directory(executor):
    fn = partial(run, suffix=' (dir list)')
    for filename in os.listdir(DIRECTORY):
        future = executor.submit(fn, filename)
        future.add_done_callback(future_callback)


def main():
    i = inotify.adapters.Inotify()

    i.add_watch(DIRECTORY.encode())

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Process the directory in a thread or locally. Not sure if it
        # is safe to submit to the executor from within one its workers.
        # Seems like it should be.
        executor.submit(do_directory, executor)
        # do_directory(executor)
        try:
            for event in i.event_gen():
                if event is not None:
                    (header, type_names, watch_path, filename) = event
                    # inotify event: IN_CLOSE_WRITE
                    if header.mask == 8:
                        future = executor.submit(process, filename.decode('utf-8'))
                        future.add_done_callback(future_callback)
                        print('Submitted inotify for', filename.decode())
        except KeyboardInterrupt:
            pass
        finally:
            i.remove_watch(DIRECTORY.encode())


if __name__ == '__main__':
    main()

测试:

从包含 10 个文件的目录开始。启动程序,等待 2 秒钟,然后创建 5 个新文件。查找“提交”消息以查看事件是否已接收并排队,同时仍在处理初始文件以及新文件最终是否得到处理。

~/p/TEST $ touch A1 A2 A3 A4 A5 A6 A7 A8 A9 A10
~/p/TEST $ do_test() {
> rm B*
> ../inotify-test.py &
> sleep 2
> touch B1 B2 B3 B4 B5
> sleep 5
> pkill -f inotify-test.py
> }
~/p/TEST $ do_test
[1] 26663
future_callback: run: A10 (dir list)
future_callback: run: A4 (dir list)
future_callback: run: A5 (dir list)
future_callback: run: A9 (dir list)
future_callback: run: A2 (dir list)
Submitted inotify for B1
Submitted inotify for B2
Submitted inotify for B3
Submitted inotify for B4
Submitted inotify for B5
future_callback: run: A3 (dir list)
future_callback: run: A8 (dir list)
future_callback: run: A1 (dir list)
future_callback: run: A7 (dir list)
future_callback: run: A6 (dir list)
future_callback: run: B1 (inotify)
future_callback: run: B2 (inotify)
future_callback: run: B3 (inotify)
future_callback: run: B4 (inotify)
future_callback: run: B5 (inotify)
~/p/TEST $ 
[1]+  Terminated              ../inotify-test.py
~/p/TEST $ 

关于Python:inotify、concurrent.futures - 如何添加现有文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47311118/

相关文章:

python - pyinotify 方法未调用

scala - 通过 Scala Spark 并行读取单独的目录并创建单独的 RDD

python - 任何实际有效的 concurrent.futures 超时?

python - 如何检查一个类是否已在同一模块中的另一个类中使用?

python - 使用 pandas resample 从重采样中排除特定列

c - 在 C 中 inotify 文件

python - ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?

python - 将缩写词替换为其值 Python

python - 为什么我的想法在 python2 中不起作用?

linux - 使用 Incrond Inotify 但遇到用户组/权限问题