java - 如何进行多线程 Merkle 树哈希

标签 java multithreading hash merkle-tree

我有一个很大的列表,我希望能够获得java中的默克尔根。它足够大,能够对进程进行多线程处理将显着加快速度,因此,我一直在尝试这样做。

这是我到目前为止的代码:

public static byte[] multiMerkleRoot(ArrayList<byte[]> temp) {
    int count = temp.size();
    List<byte[]> hashList = new ArrayList<>();

    for(byte[] o : temp) {
        hashList.add(merkleHash(o));
    }

    if (count % 2 == 0) {
        return getRoot(hashList);
    } else {
        return merkleHash(concat(getRoot(hashList.subList(0, hashList.size() - 1)), hashList.get(hashList.size() - 1)));
    }
}

private static byte[] getRoot(List<byte[]> temp) {
    if(temp.size() % 2 != 0) {
        return merkleHash(concat(getRoot(temp.subList(0, temp.size() - 1)), temp.get(temp.size() - 1)));
    } else {
        if (temp.size() > 2) {
            List<List<byte[]>> subsets = Lists.partition(temp, temp.size() / 2);

            return merkleHash(concat(getRoot(subsets.get(0)), getRoot(subsets.get(1))));
        } else {
            return merkleHash(concat(temp.get(0), temp.get(1)));
        }
    }
}

public static byte[] trueMultiMerkleRoot(ArrayList<byte[]> temp, int threads) {
    try {
        int count = temp.size();
        List<byte[]> hashList = new ArrayList<>();

        for(byte[] o : temp) {
            hashList.add(merkleHash(o));
        }

        if(count % 2 == 0) {
            byte[] chunk1 = null;

            switch(threads) {
                case 1: chunk1 = getRoot(hashList);
                        break;
                case 2: chunk1 = twoThreadMerkle(hashList);
                        break;
                default: System.out.println("You can only have the following threadcounts: 1, 2, 4, 8.");
                        break;
            }

            return chunk1;
        } else {
            byte[] chunk1 = null;
            byte[] chunk2 = hashList.get(hashList.size() - 1);

            switch(threads) {
                case 1: chunk1 = getRoot(hashList.subList(0, hashList.size() - 1));
                    break;
                case 2: chunk1 = twoThreadMerkle(hashList.subList(0, hashList.size() - 1));
                    break;
                default: System.out.println("You can only have the following threadcounts: 1, 2, 4, 8.");
                    break;
            }

            return chunk1;
        }
    } catch(Exception e) {
        return null;
    }
}

private static byte[] twoThreadMerkle(List<byte[]> temp) throws Exception {
    if (!(temp.size() >= 2)) {
        return twoThreadMerkle(temp);
    } else {
        if(temp.size() % 2 != 0) {
            return getRoot(temp);
        } else {
            List<List<byte[]>> subsets = Lists.partition(temp, temp.size() / 2);

            Executor exe1 = Executors.newSingleThreadExecutor();
            Executor exe2 = Executors.newSingleThreadExecutor();

            Future<byte[]> fut1 = ((ExecutorService) exe1).submit(() -> getRoot(subsets.get(0)));
            Future<byte[]> fut2 = ((ExecutorService) exe2).submit(() -> getRoot(subsets.get(1)));

            while ((!fut1.isDone()) || (!fut2.isDone())) {
                Thread.sleep(500);
            }

            return merkleHash(concat(fut1.get(), fut2.get()));
        }
    }
}

multiMerkleRoot是单线程版本,trueMultiMerkleRoot是多线程版本的尝试。

这是我的问题:无论我使用什么大小的列表(我尝试过使用 2 的精确幂、奇数、偶数、小和大),我总是从这两种方法中得到两个不同的答案,而且我一生都无法弄清楚如何解决这个问题。

在此实现中,merkleHash() 只是 Keccak 256 的包装器,我用它对要连接的两个字节数组进行哈希处理。

如果有人能以任何方式帮助我,无论是向我展示我的代码哪里出了问题以及如何修复它,还是只是让我的代码着火并向我展示如何正确执行它,我非常感谢您的帮助。

编辑:在我意识到以前的方法存在一些问题后,我尝试了一种不同的方法。然而,这个仍然不会多线程,尽管我认为它更接近。

这是我的新代码:

package crypto;

import org.bouncycastle.util.encoders.Hex;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;

import static crypto.Hash.keccak256;
import static util.ByteUtil.concat;

public class Merkle {
    private Queue<byte[]> data;

    public Merkle() {
        this.data = new LinkedList<>();
    }

    public Merkle(ArrayList<byte[]> in) {
        this.data = new LinkedList<>();

        this.data.addAll(in);
    }

    public void add(List<byte[]> in) {
        data.addAll(in);
    }

    public void add(byte[] in) {
        data.add(in);
    }

    public byte[] hash() {
        Queue<byte[]> nextLevel = new LinkedList<>();

        while((data.size() > 1) || (nextLevel.size() > 1)) {
            while(data.size() > 0) {
                if(data.size() > 1) {
                    nextLevel.add(merkleHash(data.remove(), data.remove()));
                } else {
                    nextLevel.add(data.remove());
                }

            }

            data.addAll(nextLevel);

            nextLevel.clear();
        }

        return data.remove();
    }

    private byte[] hash(Queue<byte[]> data) {
        Queue<byte[]> nextLevel = new LinkedList<>();

        while((data.size() > 1) || (nextLevel.size() > 1)) {

            while(data.size() > 0) {
                if(data.size() > 1) {
                    nextLevel.add(merkleHash(data.remove(), data.remove()));
                } else {
                    nextLevel.add(data.remove());
                }

            }

            data.addAll(nextLevel);

            nextLevel.clear();
        }

        return data.remove();
    }

    public byte[] dualHash() throws Exception {
        Queue<byte[]> temp1 = new LinkedList<>();
        Queue<byte[]> temp2 = new LinkedList<>();

        if(data.size() == Math.pow(2, log2(data.size()))) return hash();

        int temponesize = (int)Math.pow(2, log2(data.size()) + 1) / 2;
        while(temp1.size() < temponesize) {
            temp1.add(data.remove());
        }

        while(!data.isEmpty()) {
            temp2.add(data.remove());
        }

        /*
        ExecutorService exe1 = Executors.newSingleThreadExecutor();
        ExecutorService exe2 = Executors.newSingleThreadExecutor();
        Callable<byte[]> call1 = new Callable<byte[]>() {
            @Override
            public byte[] call() throws Exception {
                return hash(temp1);
            }
        };
        Callable<byte[]> call2 = new Callable<byte[]>() {
            @Override
            public byte[] call() throws Exception {
                return hash(temp2);
            }
        };

        Future<byte[]> fut1 = exe1.submit(call1);
        Future<byte[]> fut2 = exe2.submit(call2);
        */

        byte[] tem1 = hash(temp1);
        byte[] tem2 = hash(temp2);



        return merkleHash(tem1, tem2);
    }

    public int size() {
        return data.size();
    }

    private byte[] merkleHash(byte[] a, byte[] b) {
        return keccak256(concat(a, b));
    }

    private byte[] merkleHash(byte[] a) {
        return keccak256(a);
    }

    private int log2(int x) {
        return (int)Math.floor((Math.log(x))/(Math.log(2)));
    }
}

如果我们专门看一下 DualHash 方法,在这种情况下,它会起作用并给出与哈希方法相同的结果。但是,当我尝试将其委​​托给两个线程时,如下所示:

public byte[] dualHash() throws Exception {
        Queue<byte[]> temp1 = new LinkedList<>();
        Queue<byte[]> temp2 = new LinkedList<>();

        if(data.size() == Math.pow(2, log2(data.size()))) return hash();

        int temponesize = (int)Math.pow(2, log2(data.size()) + 1) / 2;
        while(temp1.size() < temponesize) {
            temp1.add(data.remove());
        }

        while(!data.isEmpty()) {
            temp2.add(data.remove());
        }

        ExecutorService exe1 = Executors.newSingleThreadExecutor();
        ExecutorService exe2 = Executors.newSingleThreadExecutor();
        Callable<byte[]> call1 = new Callable<byte[]>() {
            @Override
            public byte[] call() throws Exception {
                return hash(temp1);
            }
        };
        Callable<byte[]> call2 = new Callable<byte[]>() {
            @Override
            public byte[] call() throws Exception {
                return hash(temp2);
            }
        };

        Future<byte[]> fut1 = exe1.submit(call1);
        Future<byte[]> fut2 = exe2.submit(call2);

        byte[] tem1 = fut1.get();
        byte[] tem2 = fut2.get();



        return merkleHash(tem1, tem2);
    }

它不再给我预期的结果。 知道为什么吗?

谢谢!

最佳答案

找到解决方案!

事实证明我的代码不是问题(至少是编辑后的代码,我 100% 确定第一个代码块完全错误)。问题是,两个线程在都使用 MessageDigest 的一个实例时尝试对结果进行哈希处理。现在我已经强制他们使用单独的实例,代码运行得很好。

关于java - 如何进行多线程 Merkle 树哈希,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50615854/

相关文章:

java - 有没有办法从抽象类的具体方法访问具体类变量值

java - 如何从 Java 生成 CloudFormation 代码?

c# - 在 .NET 中为方法和线程计时

java - 外部结束无限循环java

java - 如何文件行尾并增加变量

python - 如何立即重新引发任何工作线程中抛出的异常?

具有雪崩效应的java字符串哈希函数

c# - 尝试在 C# 上复制 JavaScript 哈希值

c++ - 具有任意数量基本类型属性的对象的 std::hash 变体

java - 如果从 UncaughtExceptionHandler 内部抛出异常会怎样?