背景
我使用来自合成孔径雷达卫星的非常大的数据集。这些可以被认为是一侧 10k 像素数量级的高动态范围灰度图像。
最近,我一直在开发 Lindeberg's scale-space ridge detection algorithm method 的单尺度变体的应用程序。用于检测 SAR 图像中的线性特征。这是对使用定向滤波器或使用霍夫变换的改进,这两种方法以前都使用过,因为它的计算成本低于任何一种。 (我将在 4 月份在 JURSE 2011 上展示一些最近的结果,如果有帮助,我可以上传预印本)。
我目前使用的代码生成一个记录数组,每个像素一个,每个记录描述像素右下角矩形中的一个脊段,并以相邻像素为界。
struct ridge_t { unsigned char top, left, bottom, right };
int rows, cols;
struct ridge_t *ridges; /* An array of rows*cols ridge entries */
ridges
中的条目如果恰好是 top
中的两个,则包含一个脊线段, left
, right
和 bottom
有 0 - 128 范围内的值。假设我有:ridge_t entry;
entry.top = 25; entry.left = 255; entry.bottom = 255; entry.right = 76;
然后我可以找到脊线段的起点 (x1,y1) 和终点 (x2,y2):
float x1, y1, x2, y2;
x1 = (float) col + (float) entry.top / 128.0;
y1 = (float) row;
x2 = (float) col + 1;
y2 = (float) row + (float) entry.right / 128.0;
当这些单独的山脊段被渲染时,我得到一个像这样的图像(更大图像的一个很小的角落):
这些长曲线中的每一条都是由一系列微小的山脊段渲染而成的。
确定包含脊段的两个相邻位置是否连接是微不足道的。如果我有
ridge1
在 (x, y) 和 ridge2
在 (x+1, y) 处,如果 0 <= ridge1.right
,则它们是同一行的一部分<= 128 和 ridge2.left
= ridge1.right
.问题
理想情况下,我想将所有脊线段拼接成线,以便我可以遍历图像中找到的每条线以应用进一步的计算。不幸的是,我发现很难找到一种算法来做到这一点,它既低复杂性又节省内存并且适合多处理(处理非常大的图像时所有重要的考虑因素!)
我考虑过的一种方法是扫描图像,直到找到一个只有一个连接脊段的脊,然后沿着结果线走,将线中的任何脊标记为已访问。但是,这不适合多处理,因为如果没有昂贵的锁定,就无法判断是否有另一个线程从另一个方向走同一条线(比如说)。
读者建议什么作为可能的方法?过去似乎有人会想出一种有效的方法来做这种事情......
最佳答案
我不完全确定这是正确的,但我想我会把它扔出去征求意见。首先,让我介绍一个无锁不相交集算法,它将构成我提出的算法的重要组成部分。
无锁不相交集算法
我假设在您选择的 CPU 架构上存在两个指针大小的比较和交换操作。这至少在 x86 和 x64 架构上可用。
该算法与 the Wikipedia page for the single threaded case 中描述的大致相同,为了安全的无锁操作进行了一些修改。首先,我们要求 rank 和 parent 元素都是指针大小的,并且在内存中对齐到 2*sizeof(pointer) ,以便稍后用于原子 CAS。
Find() 不需要改变;最坏的情况是路径压缩优化在同时写入的情况下将无法完全发挥作用。
但是, Union() 必须改变:
function Union(x, y)
redo:
x = Find(x)
y = Find(y)
if x == y
return
xSnap = AtomicRead(x) -- read both rank and pointer atomically
ySnap = AtomicRead(y) -- this operation may be done using a CAS
if (xSnap.parent != x || ySnap.parent != y)
goto redo
-- Ensure x has lower rank (meaning y will be the new root)
if (xSnap.rank > ySnap.rank)
swap(xSnap, ySnap)
swap(x, y)
-- if same rank, use pointer value as a fallback sort
else if (xSnap.rank == ySnap.rank && x > y)
swap(xSnap, ySnap)
swap(x, y)
yNew = ySnap
yNew.rank = max(yNew.rank, xSnap.rank + 1)
xNew = xSnap
xNew.parent = y
if (!CAS(y, ySnap, yNew))
goto redo
if (!CAS(x, xSnap, xNew))
goto redo
return
这应该是安全的,因为它永远不会形成循环,并且总是会导致正确的 union 。我们可以通过观察来证实这一点:
在同时发生变异的情况下,有可能是 y 的 rank 可能会增加,然后由于冲突而返回重做。但是,这意味着 y 不再是根(在这种情况下排名无关紧要)或者 y 的排名已被另一个过程增加(在这种情况下,第二次循环将无效并且 y 将具有正确的排名)。
因此,应该没有形成循环的机会,并且这种无锁不相交集算法应该是安全的。
现在开始解决您的问题...
假设
我假设脊线段只能在它们的端点相交。如果不是这种情况,您将需要以某种方式更改阶段 1。
我还假设单个整数像素位置的共存足以连接脊线段。如果没有,您将需要更改阶段 1 中的数组以保存多个候选脊线段+不相交集对,并过滤以找到实际连接的那些。
本算法中使用的不相交集结构应在其结构中携带对线段的引用。在合并的情况下,我们任意选择两个记录的片段之一来表示集合。
第一阶段:本地线路识别
我们首先将 map 划分为多个扇区,每个扇区将作为单独的作业进行处理。多个作业可能会在不同的线程中处理,但每个作业只会由一个线程处理。如果脊段穿过一个扇区,则它被分成两段,每个扇区一个。
对于每个扇区,建立一个将像素位置映射到不相交集结构的阵列。这个数组大部分会在后面被丢弃,所以它的内存需求应该不会成为太大的负担。
然后我们继续处理扇区中的每个线段。我们首先选择一个不相交的集合,代表该线段构成的整条线。我们首先在像素位置数组中查找每个端点,看看是否已经分配了一个不相交的集合结构。如果端点之一已经在这个数组中,我们使用分配的不相交集。如果两者都在数组中,我们对不相交的集合执行合并,并使用新的根作为我们的集合。否则,我们创建一个新的不相交集,并将对当前线段的引用与不相交集结构相关联。然后我们将每个端点的新不相交集的根写回到像素位置数组中。
对扇区中的每个线段重复这个过程;到最后,我们将通过一个不相交的集合完全识别扇区内的所有线。
请注意,由于不相交的集合尚未在线程之间共享,因此还没有必要使用比较和交换操作;只需使用普通的单线程 union 合并算法。由于我们在算法完成之前不会释放任何不相交的集合结构,因此分配也可以从每线程凹凸分配器进行,使内存分配(实际上)无锁和 O(1)。
一旦一个扇区被完全处理,像素位置阵列中的所有数据都将被丢弃;然而,与扇区边缘像素对应的数据被复制到一个新的阵列中,并为下一阶段保留。
由于遍历整个图像是 O(x*y),而不相交合并实际上是 O(1),所以这个操作是 O(x*y) 并且需要工作内存 O(m+2*x*y/k+ k^2) = O(x*y/k+k^2),其中 t 是扇区数,k 是扇区的宽度,m 是扇区中部分线段的数量(取决于如何通常线条跨越边界,m 可能变化很大,但永远不会超过线段的数量)。结转到下一个操作的内存是 O(m + 2*x*y/k) = O(x*y/k)
第二阶段:跨部门合并
处理完所有扇区后,我们将转到合并跨扇区的行。对于扇区之间的每个边界,我们对跨越边界的行(即边界每一侧的相邻像素已分配给行集)执行无锁合并操作。
此操作的运行时间为 O(x+y) 并消耗 O(1) 内存(但是我们必须保留阶段 1 的内存)。完成后,可以丢弃边缘阵列。
第 3 阶段:收集线
我们现在对所有分配的不相交集结构对象执行多线程映射操作。我们首先跳过任何不是根的对象(即,其中 obj.parent != obj)。然后,从代表性线段开始,我们从那里移出并收集和记录有关相关线的任何所需信息。我们确信一次只有一个线程在查看任何给定的线,因为相交的线最终会出现在相同的不相交集结构中。
这具有 O(m) 运行时间,内存使用量取决于您需要收集有关这些线段的哪些信息。
总结
总的来说,这个算法应该有 O(x*y) 运行时间和 O(x*y/k + k^2) 内存使用。调整 k 可以在阶段 1 进程的 transient 内存使用与延续到阶段 2 的邻接数组和不相交集结构的长期内存使用之间进行权衡。
请注意,我还没有在现实世界中实际测试过该算法的性能;也有可能我在上面的无锁不相交集 union 合并算法中忽略了并发问题。欢迎评论:)
关于c - 非常大的图像中的内存高效行拼接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4837465/