我正在玩一点 Java 流,我想出了一个问题的解决方案,我想与您分享,看看我的方法是否正确。
我已从 https://catalog.data.gov/dataset/consumer-complaint-database 下载了数据集拥有70万+条客户投诉记录。 我正在使用的信息如下:
公司名称 产品名称
我的目标是获得以下结果:
数据集中出现次数较多的 10 家公司
数据集中出现次数较多的 10 个产品
并得到类似的东西
Map<String, Map<String,Integer>>
其中,主Map的Key为公司名称,副Map的Key为产品名称,其值为该产品在该公司投诉的次数。
所以我所做的解决方案如下:
@Test
public void joinGroupingsTest() throws URISyntaxException, IOException {
String path = CsvReaderTest.class.getResource("/complains.csv").toURI().toString();
complains = CsvReader.readFileStreamComplain(path.substring(path.indexOf('/')+1));
Map<String, List<Complain>> byCompany = complains.parallelStream()
.collect(Collectors.groupingBy(Complain::getCompany))
.entrySet().stream()
.sorted((f1, f2) -> Long.compare(f2.getValue().size(), f1.getValue().size()))
.limit(10)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
Map<String, List<Complain>> byProduct = complains.parallelStream()
.collect(Collectors.groupingBy(Complain::getProduct))
.entrySet().stream()
.sorted((f1, f2) -> Long.compare(f2.getValue().size(), f1.getValue().size()))
.limit(10)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
Map<String, List<Complain>> map = complains.parallelStream()
.filter((x) -> byCompany.get(x.getCompany()) != null
&& byProduct.get(x.getProduct()) != null)
.collect(Collectors.groupingBy(Complain::getCompany));
Map<String, Map<String, Long>> map2 = map.entrySet().parallelStream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().stream()
.collect(Collectors.groupingBy(Complain::getProduct, Collectors.counting()))
));
System.out.println(map2);
}
如您所见,我有几个步骤来实现这一目标:
1) 我得到了发生次数较多的 10 家公司以及相关的投诉(记录)
2) 我得到了出现次数较多的 10 个产品以及相关的投诉(记录)
3) 我得到一张以公司名称为键的 map ,该 map 位于之前计算的前 10 名公司中,以及同样位于前 10 名产品中的产品的投诉
4)我进行所需的转换以获得我想要的 map 。
除了在两个不同的线程中 fork 和分离步骤 1 和 2 之外,是否还有其他考虑因素,我可能需要提高性能,甚至以更好的方式使用流。
谢谢!
最佳答案
在前两个操作中,您将集合分组到List
中,只是为了按其大小排序。这显然是对资源的浪费,因为您可以在分组时简单地对组元素进行计数,然后按计数进行排序。此外,由于前两个操作是相同的,除了分组功能之外,值得通过为任务创建一个方法来消除代码重复。
另外两个流操作可以通过在收集组时立即对组执行collect
操作来完成。
public void joinGroupingsTest() throws URISyntaxException, IOException {
String path = CsvReaderTest.class.getResource("/complains.csv").toURI().toString();
complains = CsvReader.readFileStreamComplain(path.substring(path.indexOf('/')+1));
Set<String> byCompany = getTopTen(complains, Complain::getCompany);
Set<String> byProduct = getTopTen(complains, Complain::getProduct);
Map<String, Map<String, Long>> map = complains.stream()
.filter(x -> byCompany.contains(x.getCompany())
&& byProduct.contains(x.getProduct()))
.collect(Collectors.groupingBy(Complain::getCompany,
Collectors.groupingBy(Complain::getProduct, Collectors.counting())));
System.out.println(map);
}
static <T,V> Set<V> getTopTen(Collection<T> source, Function<T,V> criteria) {
return source.stream()
.collect(Collectors.groupingBy(criteria, Collectors.counting()))
.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.limit(10)
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}
请注意,两个条件的交集可能小于十个元素,甚至可能为空。您可能会重新考虑这种情况。
此外,您应该始终重新检查数据量是否确实足够大以从并行处理中受益。另请注意,getTopTen
操作由两个流操作组成。将第一个切换为并行不会改变第二个的性质。
关于java - JAVA 中的 Streams 聚合,这是一个好方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44032340/