BloomFilter(大數據去重)+Redis(持久化)策略

背景

之前在重構一套文章爬蟲系統時,其中有塊邏輯是根據文章標題去重,原先去重的方式是,插入文章之前檢查待插入文章的標題是否在ElasticSearch中存在,這無疑加重了ElasticSearch的負擔也勢必會影響程序的性能!

BloomFilter算法

簡介:布隆過濾器實際上是一個很長的二進制向量一系列隨機映射函數。布隆過濾器可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的算法,缺點是有一定的誤識別率和刪除困難。

原理:當一個元素被加入集合時,通過K個散列函數將這個元素映射成一個位數組中的K個點,把它們置為1。檢索時,我們只要看看這些點是不是都是1就(大約)知道集合中有沒有它了:如果這些點有任何一個0,則被檢元素一定不在;如果都是1,則被檢元素很可能在

優點

相比於其它的數據結構,布隆過濾器在空間和時間方面都有巨大的優勢。布隆過濾器存儲空間和插入/查詢時間都是常數(O(k))。而且它不存儲元素本身,在某些對保密要求非常嚴格的場合有優勢。

缺點:

一定的誤識別率和刪除困難。

結合以上幾點及去重需求(容忍誤判,會誤判在,在則丟,無妨),決定使用BlomFilter。

思想

位數組k個散列函數

位數組

初始狀態時,BloomFilter是一個長度為m的位數組,每一位都置為0。

添加元素(k個獨立的hash函數)

添加元素時,對x使用k個哈希函數得到k個哈希值,對m取餘,對應的bit位設置為1。

判斷元素是否存在

判斷y是否屬於這個集合,對y使用k個哈希函數得到k個哈希值,對m取餘,所有對應的位置都是1,則認為y屬於該集合(哈希衝突,可能存在誤判),否則就認為y不屬於該集合。

圖中y1不是集合中的元素,y2屬於這個集合或者是一個false positive。

實現

可以使用JDK自帶的BitSet來實現,但存在兩個問題:OOM和持久化問題

結合Redis的BitMap能夠解決,唯一需要注意的是Redis的BitMap只支持2^32大小,對應到內存也就是512MB,數組的下標最大隻能是2^32-1。不過這個限制可以通過構建多個Redis的Bitmap通過hash取模的方式分散一下即可。萬分之一的誤判率,512MB可以放下2億條數據。

好了,扯了這麼多,貼代碼!(注:在MagnusS/Java-BloomFilter的基礎上加上了Redis持久化的實現)

@Component
publicclassBloomFilter<E>{

@Autowired
privateRedisTemplate<String, E> redisTemplate;

@Value("${bloomfilter.expireDays}")
privatelong expireDays;

// total length of theBloom filter
privateint sizeOfBloomFilter;
// expected (maximum)number of elements to be added
privateint expectedNumberOfFilterElements;
// number of hash functions
privateint numberOfHashFunctions;
// encoding used forstoring hash values as strings
privatefinalCharset charset = Charset.forName("UTF-8");
// MD5 gives good enoughaccuracy in most circumstances. Change to SHA1 if it's needed
privatestaticfinalString hashName ="MD5";
privatestaticfinalMessageDigest digestFunction;

// The digest method isreused between instances
static {
MessageDigest tmp;
try{
tmp = java.security.MessageDigest.getInstance(hashName);
}catch(NoSuchAlgorithmException e) {
tmp =null;
}
digestFunction = tmp;
}

public BloomFilter() {
this(0.0001,600000);
}

/**
* Constructs an empty Bloomfilter.
*
*@paramm is the total length ofthe Bloom filter.
*@paramn is the expected number ofelements the filter will contain.
*@paramk is the number of hashfunctions used.
*/
public BloomFilter(int m, int n, int k) {
this.sizeOfBloomFilter = m;
this.expectedNumberOfFilterElements = n;
this.numberOfHashFunctions = k;
}

/**
* Constructs an empty Bloomfilter with a given false positive probability.
* The size of bloom filterand the number of hash functions is estimated
* to match the falsepositive probability.
*
*@paramfalsePositiveProbability isthe desired false positive probability.
*@paramexpectedNumberOfElements isthe expected number of elements in the Bloom filter.
*/
public BloomFilter(double falsePositiveProbability, intexpectedNumberOfElements) {
this((int) Math.ceil((int)Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) * expectedNumberOfElements / Math.log(2)),// m = ceil(kn/ln2)
expectedNumberOfElements,
(int) Math.ceil(-(Math.log(falsePositiveProbability) /Math.log(2))));// k = ceil(-ln(f)/ln2)
}

/**
* Adds an object to theBloom filter. The output from the object's
* toString() method is usedas input to the hash functions.
*
*@paramelement is an element toregister in the Bloom filter.
*/
public void add(E element) {
add(element.toString().getBytes(charset));
}

/**
* Adds an array of bytes tothe Bloom filter.
*
*@parambytes array of bytes to addto the Bloom filter.
*/
public void add(byte[] bytes) {
if(redisTemplate.opsForValue().get(RedisConsts.CRAWLER_BLOOMFILTER)==null) {
redisTemplate.opsForValue().setBit(RedisConsts.CRAWLER_BLOOMFILTER,0,false);
redisTemplate.expire(RedisConsts.CRAWLER_BLOOMFILTER,expireDays, TimeUnit.DAYS);
}

int[] hashes = createHashes(bytes, numberOfHashFunctions);
for(int hash : hashes) {
redisTemplate.opsForValue().setBit(RedisConsts.CRAWLER_BLOOMFILTER,Math.abs(hash % sizeOfBloomFilter),true);
}
}

/**
* Adds all elements from aCollection to the Bloom filter.
*
*@paramc Collection of elements.
*/
public void addAll(Collection<?extendsE> c) {
for(E element : c) {
add(element);
}
}

/**
* Returns true if theelement could have been inserted into the Bloom filter.
* UsegetFalsePositiveProbability() to calculate the probability of this
* being correct.
*
*@paramelement element to check.
*@returntrue if the element couldhave been inserted into the Bloom filter.
*/
public boolean contains(E element) {
returncontains(element.toString().getBytes(charset));
}

/**
* Returns true if the arrayof bytes could have been inserted into the Bloom filter.
* UsegetFalsePositiveProbability() to calculate the probability of this
* being correct.
*
*@parambytes array of bytes tocheck.
*@returntrue if the array couldhave been inserted into the Bloom filter.
*/
public boolean contains(byte[] bytes) {
int[] hashes = createHashes(bytes, numberOfHashFunctions);
for(int hash : hashes) {
if(!redisTemplate.opsForValue().getBit(RedisConsts.CRAWLER_BLOOMFILTER,Math.abs(hash % sizeOfBloomFilter))) {
returnfalse;
}
}
returntrue;
}

/**
* Returns true if all theelements of a Collection could have been inserted
* into the Bloom filter.Use getFalsePositiveProbability() to calculate the
* probability of this beingcorrect.
*
*@paramc elements to check.
*@returntrue if all the elements inc could have been inserted into the Bloom filter.
*/
public boolean containsAll(Collection<?extendsE> c) {
for(E element : c) {
if(!contains(element)) {
returnfalse;
}
}
returntrue;
}

/**
* Generates digests basedon the contents of an array of bytes and splits the result into 4-byte int'sand store them in an array. The
* digest function is calleduntil the required number of int's are produced. For each call to digest a salt
* is prepended to the data.The salt is increased by 1 for each call.
*
*@paramdata specifies input data.
*@paramhashes number ofhashes/int's to produce.
*@returnarray of int-sized hashes
*/
public static int[] createHashes(byte[] data, int hashes) {
int[] result =newint[hashes];

int k =0;
byte salt =0;
while(k < hashes) {
byte[] digest;
synchronized (digestFunction) {
digestFunction.update(salt);
salt++;
digest = digestFunction.digest(data);
}

for(int i =0; i < digest.length /4&& k < hashes; i++) {
int h =0;
for(int j = (i *4); j < (i *4) +4; j++) {
h <<=8;
h |= ((int) digest[j]) &0xFF;
}
result[k] = h;
k++;
}
}
returnresult;
}

public int getSizeOfBloomFilter() {
returnthis.sizeOfBloomFilter;
}

public int getExpectedNumberOfElements() {
returnthis.expectedNumberOfFilterElements;
}

public int getNumberOfHashFunctions() {
returnthis.numberOfHashFunctions;
}

/**
* Compares the contents oftwo instances to see if they are equal.
*
*@paramobj is the object tocompare to.
*@returnTrue if the contents of theobjects are equal.
*/
@Override
public boolean equals(Object obj) {
if(obj ==null) {
returnfalse;
}
if(getClass() != obj.getClass()) {
returnfalse;
}
finalBloomFilter<E> other = (BloomFilter<E>) obj;
if(this.sizeOfBloomFilter!= other.sizeOfBloomFilter) {
returnfalse;
}
if(this.expectedNumberOfFilterElements!= other.expectedNumberOfFilterElements) {
returnfalse;
}
if(this.numberOfHashFunctions!= other.numberOfHashFunctions) {
returnfalse;
}
returntrue;
}

/**
* Calculates a hash codefor this class.
*
*@returnhash code representing thecontents of an instance of this class.
*/
@Override
public int hashCode() {
int hash =7;
hash =61* hash +this.sizeOfBloomFilter;
hash =61* hash +this.expectedNumberOfFilterElements;
hash =61* hash +this.numberOfHashFunctions;
returnhash;
}

public static void main(String[] args) {
BloomFilter<String> bloomFilter =newBloomFilter<>(0.0001,600000);
System.out.println(bloomFilter.getSizeOfBloomFilter());
System.out.println(bloomFilter.getNumberOfHashFunctions());
}
}

相關推薦

推薦中...