我正在 Jupyter 5 上使用 Python 3.6.1。我的目标是测试 portalocker管理对同一文件的并发追加。
为了实现这一点,我创建了一个简单的函数,将一行附加到同一个文件中,并使用 multiprocessing.Pool 和 Pool.map() 并行运行该函数。
这是 Jupyter notebook 中的代码。
单元格 1
from time import time
from multiprocessing import Pool
import portalocker
def f(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(f'{time()}\n')
单元格 2
start = time()
with Pool(4) as p:
p.map(f, range(4))
单元格 3
with open('portalocker_test.txt', 'r') as f:
for line in f:
print(line, end='')
如果我在获得预期结果后运行此代码:
在单元格 3 之外:
1495614277.189394
1495614277.1893928
1495614277.1893911
1495614277.1894028
但是,如果我再次运行单元格 2(不重新启动笔记本),我会得到:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-db9c07d32724> in <module>()
1 start = time()
2 with Pool(4) as p:
----> 3 p.map(f, range(4))
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
383 break
384 try:
--> 385 put(task)
386 except Exception as e:
387 job, ind = task[:2]
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: cannot serialize '_io.TextIOWrapper' object
如果我在运行单元格 2 之前读取文件,则会引发相同的错误。因此,如果我在运行单元格 2 之前从未打开文件,则一切正常。如果我之前打开该文件,则会出现该错误。
这对我来说很不一致。到底是怎么回事?如何解决?
此外,使用或不使用 portalocker 不会改变这种行为,所以它不是 portalocker 的问题。我没有在普通 python 上检查过它,但我真的很想用 Jupyter 运行它。
最佳答案
问题是你应该避免不同对象的相同名称,在你的情况下应该有帮助
f
更改函数名称至 function
(或不同于 f
的其他名称)单元格 1
from time import time
from multiprocessing import Pool
import portalocker
def function(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(f'{time()}\n')
单元格 2
start = time()
with Pool(4) as p:
p.map(function, range(4))
或者
open
获得的文件对象来自 f
至 file
(或不同于 f
的其他名称):单元格 1
from time import time
from multiprocessing import Pool
import portalocker
def f(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as file:
portalocker.lock(file, portalocker.LOCK_EX)
file.write(f'{time()}\n')
单元格 3
with open('portalocker_test.txt', 'r') as file:
for line in file:
print(line, end='')
或两者
关于python-3.x - 不一致的类型错误 : cannot serialize '_io.TextIOWrapper' object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44153174/