java - 在减少侧联接中,bloom filter

  显示原文与译文双语对照的内容
97 4

我目前正在探索 bloom filter 。我已经经历了on上的大部分博客,并且了解了但仍然无法找到一个例子。

每篇文章都说会减少网络 I/O 但它们都不会显示?特别是一个很好的 http://vanjakom.wordpress.com/tag/distributed-cache/with,但它似乎像我刚刚开始的地图减少一样复杂。

是否可以帮助我在下面的示例中实现bloom过滤器( reduceside连接)

要读取用户记录和部门记录以及要加入的减速器,请花费 mapers

用户记录

id,名称

3738,Richie 。戈尔

12946,Rony Sam

17556,David Gart

3443,雷切尔史密斯

5799,Paul Rosta

部门记录

3738,销售

12946,市场营销

17556,市场营销

3738,销售

3738,销售

代码

public class UserMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>{
 private Text outkey = new Text();
 private Text outval = new Text();
 private String id, name;
public void map (LongWritable key, Text value, OutputCollector<Text, Text> ouput,Reporter reporter)
 throws IOException {
 String line = value.toString();
 String arryUsers[] = line.split(",");
 id = arryUsers[0].trim();
 name = arryUsers[1].trim();
 outkey.set(id);
 outval.set("A"+ name);
 ouput.collect(outkey, outval);
 }
 }
public class DepartMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text Outk = new Text();
private Text Outv = new Text();
String depid, dep ;
public void map (LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
 String line = value.toString();
 String arryDept[] = line.split(",");
 depid = arryDept[0].trim();
 dep = arryDept[1].trim();
 Outk.set(depid);
 Outv.set("B" + dep);
 output.collect(Outk, Outv);
}
 }

减速器

ublic class JoinReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
private Text tmp = new Text();
private ArrayList<Text> listA = new ArrayList<Text>();
private ArrayList<Text> listB = new ArrayList<Text>();
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException {
 listA.clear();
 listB.clear();
 while (values.hasNext()) {
 tmp = values.next();
 if (tmp.charAt(0) == 'A') {
 listA.add(new Text(tmp.toString().substring(1)));
 } else if (tmp.charAt(0) == 'B') {
 listB.add(new Text(tmp.toString().substring(1)));
 }
 }
 executejoinlogic(output);
}
private void executejoinlogic(OutputCollector<Text, Text> output) throws IOException {
 if (!listA.isEmpty() &&!listB.isEmpty()) {
 for (Text A : listA) {
 for (Text B : listB) {
 output.collect(A, B);
 }
 }
 }
 }
 }

在上述场景中实现 bloom filter有可能?

如果是的话,请帮我实现这个?

时间:原作者:1个回答

69 4

只有当两个输入表中的一个比另一个输入表小得多时,才可以在此实现bloom筛选器。你需要遵循的过程如下:

  1. 在Mapper类( 过滤器对象本身应该是全局的,以便 map() 方法可以在以后访问它)的setup() 方法中初始化bloom过滤器:

    filter = new BloomFilter(VECTOR_SIZE,NB_HASH,HASH_TYPE);

  2. 将较小的表读取到映射器的setup() 方法中。

  3. 将每个记录的ID添加到bloom过滤器中:

    filter.add(ID);

  4. map() 方法本身中,在来自较大输入源的任何id上使用 filter.membershipTest(ID)如果没有匹配,则知道,不存在于较小的数据集中,因这里不应该传递给减小器。

  5. 请记住,你将在减速器中获得 false 正确性,所以不要假设所有的。
原作者:
...