redis實現隊列消息的ack

NoSQL Redis 腳本語言 Lua 達人科技 2017-04-05

由於公司提供的隊列實在太過於蛋疼而且還限制不能使用其他隊列,但為了保證數據安全性需要一個可以有ack功能的隊列。

原生的redis中通過L/R PUSH/POP方式來實現隊列的功能,這個當然是沒辦法滿足需求的(沒有ack功能),所以需要自己對redis的list(隊列)做個小小的調整。

大體思路為在POP時將pop出的數據放到備份的地方,當有ACK請求(確認消息被消耗)後將備份的信息刪除掉;每次在pop前需要檢查備份隊列中有沒有過期的數據沒有ack的,如果有則PUSH到list中後再從list中POP出來。

以下腳本使用lua實現,只需要在執行前加載到redis中即可。

消息本身需要包含id屬性

push沒什麼問題,原生即可(此處以LPUSH為例)

pop時腳本

 1 local not_empty = function(x)
 2     return (type(x) == "table") and (not x.err) and (#x ~= 0)
 3 end
 4 
 5 local qName = ARGV[1] --隊列名稱
 6 local currentTime = ARGV[2] --當前時間,這個需要從外部傳入,不能使用redis自身時間,如果使用自身時間可能導致redis本身的backup在重放請求時出現不一致性
 7 local considerAsFailMaxTimeSpan = ARGV[3] --超時時間設定,當消息超過一定時間還沒有ack則認為此消息需要再次入隊
 8 
 9 local zsetName= qName ..'BACKUP'
10 local hashName= qName ..'CONTEXT'
11 
12 local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1)
13 if (not_empty(tmp)) then
14     redis.call('ZREM', zsetName, tmp[1]) --此處拿出的為消息的唯一id
15     redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1]))
16 end
17 tmp = redis.call('RPOP', qName)
18 if (tmp) then
19     local msg = cjson.decode(tmp)
20     local id = msg['id']
21     redis.call('ZADD', zsetName, tonumber(currentTime), id)
22     redis.call('HSET',hashName , id, tmp)
23 end
24 return tmp

ack時候比較簡單,只需要將指定id從set和hash中刪除即可

1 local key = ARGV[1]
2 local qName=ARGV[2]
3 redis.call('ZREM', qName..'BACKUP', key)
4 redis.call('HDEL', qName..'CONTEXT', key)

在程序中使用前需要顯示load這兩個腳本,後面直接調用這兩個腳本的sha值即可執行。

相關推薦

推薦中...