mirror of
https://github.com/dkam/suo.git
synced 2025-01-29 07:42:43 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
155a3ac40c | ||
|
|
10d3ab09cf | ||
|
|
2724ec6d9d | ||
|
|
185327f59c | ||
|
|
c8a972da31 | ||
|
|
7591c08a28 | ||
|
|
1dee338e16 | ||
|
|
900c723043 | ||
|
|
6e2afdf80a | ||
|
|
49a9757d44 | ||
|
|
754d7d8faf |
@@ -1,3 +1,10 @@
|
||||
## 0.2.2
|
||||
|
||||
- Fix bug with refresh - typo would've prevented real use.
|
||||
- Clean up code.
|
||||
- Improve documentation a bit.
|
||||
- 100% test coverage.
|
||||
|
||||
## 0.2.1
|
||||
|
||||
- Fix bug when dealing with real-world Redis error conditions.
|
||||
|
||||
29
README.md
29
README.md
@@ -1,4 +1,4 @@
|
||||
# Suo [](https://travis-ci.org/nickelser/suo) [](http://badge.fury.io/rb/suo)
|
||||
# Suo [](https://travis-ci.org/nickelser/suo) [](https://codeclimate.com/github/nickelser/suo) [](https://codeclimate.com/github/nickelser/suo) [](http://badge.fury.io/rb/suo)
|
||||
|
||||
:lock: Distributed semaphores using Memcached or Redis in Ruby.
|
||||
|
||||
@@ -41,16 +41,35 @@ Thread.new { suo.lock("other_key", 2) { puts "Three" } }
|
||||
suo = Suo::Client::Memcached.new(client: some_dalli_client, acquisition_timeout: 1) # in seconds
|
||||
|
||||
# manually locking/unlocking
|
||||
suo.lock("a_key")
|
||||
# the return value from lock without a block is a unique token valid only for the current lock
|
||||
# which must be unlocked manually
|
||||
lock = suo.lock("a_key")
|
||||
foo.baz!
|
||||
suo.unlock("a_key")
|
||||
suo.unlock("a_key", lock)
|
||||
|
||||
# custom stale lock cleanup (cleaning of dead clients)
|
||||
# custom stale lock expiration (cleaning of dead locks)
|
||||
suo = Suo::Client::Redis.new(client: some_redis_client, stale_lock_expiration: 60*5)
|
||||
```
|
||||
|
||||
### Stale locks
|
||||
|
||||
"Stale locks" - those acquired more than `stale_lock_expiration` (defaulting to 3600 or one hour) ago - are automatically cleared during any operation on the key (`lock`, `unlock`, `refresh`). The `locked?` method will not return true if only stale locks exist, but will not modify the key itself.
|
||||
|
||||
To re-acquire a lock in the middle of a block, you can use the refresh method on client.
|
||||
|
||||
```ruby
|
||||
suo = Suo::Client::Redis.new
|
||||
|
||||
# lock is the same token as seen in the manual example, above
|
||||
suo.lock("foo") do |lock|
|
||||
5.times do
|
||||
baz.bar!
|
||||
suo.refresh("foo", lock)
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
## TODO
|
||||
- better stale key handling (refresh blocks)
|
||||
- more race condition tests
|
||||
|
||||
## History
|
||||
|
||||
@@ -24,7 +24,7 @@ module Suo
|
||||
|
||||
if block_given? && token
|
||||
begin
|
||||
yield
|
||||
yield(token)
|
||||
ensure
|
||||
unlock(key, token)
|
||||
end
|
||||
@@ -39,9 +39,9 @@ module Suo
|
||||
|
||||
def locks(key)
|
||||
val, _ = get(key)
|
||||
locks = deserialize_locks(val)
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
locks
|
||||
cleared_locks
|
||||
end
|
||||
|
||||
def refresh(key, acquisition_token)
|
||||
@@ -53,11 +53,11 @@ module Suo
|
||||
next
|
||||
end
|
||||
|
||||
locks = deserialize_and_clear_locks(val)
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
refresh_lock(locks, acquisition_token)
|
||||
refresh_lock(cleared_locks, acquisition_token)
|
||||
|
||||
break if set(key, serialize_locks(locks), cas)
|
||||
break if set(key, serialize_locks(cleared_locks), cas)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -69,12 +69,12 @@ module Suo
|
||||
|
||||
break if val.nil?
|
||||
|
||||
locks = deserialize_and_clear_locks(val)
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
||||
acquisition_lock = remove_lock(cleared_locks, acquisition_token)
|
||||
|
||||
break unless acquisition_lock
|
||||
break if set(key, serialize_locks(locks), cas)
|
||||
break if set(key, serialize_locks(cleared_locks), cas)
|
||||
end
|
||||
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
|
||||
# ignore - assume success due to optimistic locking
|
||||
@@ -98,12 +98,12 @@ module Suo
|
||||
next
|
||||
end
|
||||
|
||||
locks = deserialize_and_clear_locks(val)
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
if locks.size < resources
|
||||
add_lock(locks, token)
|
||||
if cleared_locks.size < resources
|
||||
add_lock(cleared_locks, token)
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
newval = serialize_locks(cleared_locks)
|
||||
|
||||
if set(key, newval, cas)
|
||||
acquisition_token = token
|
||||
@@ -119,11 +119,11 @@ module Suo
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def set(key, newval, oldval) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def set(key, newval, cas) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def initial_set(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def initial_set(key, val = "") # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
@@ -162,7 +162,7 @@ module Suo
|
||||
unpacked.map do |time, token|
|
||||
[Time.at(time), token]
|
||||
end
|
||||
rescue EOFError => _
|
||||
rescue EOFError, MessagePack::MalformedFormatError => _
|
||||
[]
|
||||
end
|
||||
|
||||
@@ -171,8 +171,8 @@ module Suo
|
||||
locks.reject { |time, _| time < expired }
|
||||
end
|
||||
|
||||
def add_lock(locks, token)
|
||||
locks << [Time.now.to_f, token]
|
||||
def add_lock(locks, token, time = Time.now.to_f)
|
||||
locks << [time, token]
|
||||
end
|
||||
|
||||
def remove_lock(locks, acquisition_token)
|
||||
@@ -182,7 +182,7 @@ module Suo
|
||||
|
||||
def refresh_lock(locks, acquisition_token)
|
||||
remove_lock(locks, acquisition_token)
|
||||
add_lock(locks, token)
|
||||
add_lock(locks, acquisition_token)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -20,8 +20,8 @@ module Suo
|
||||
@client.set_cas(key, newval, cas)
|
||||
end
|
||||
|
||||
def initial_set(key)
|
||||
@client.set(key, "")
|
||||
def initial_set(key, val = "")
|
||||
@client.set(key, val)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -32,8 +32,8 @@ module Suo
|
||||
@client.unwatch
|
||||
end
|
||||
|
||||
def initial_set(key)
|
||||
@client.set(key, "")
|
||||
def initial_set(key, val = "")
|
||||
@client.set(key, val)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
module Suo
|
||||
VERSION = "0.2.1"
|
||||
VERSION = "0.2.2"
|
||||
end
|
||||
|
||||
@@ -30,4 +30,5 @@ Gem::Specification.new do |spec|
|
||||
spec.add_development_dependency "rake", "~> 10.0"
|
||||
spec.add_development_dependency "rubocop", "~> 0.30.0"
|
||||
spec.add_development_dependency "minitest", "~> 5.5.0"
|
||||
spec.add_development_dependency "codeclimate-test-reporter", "~> 0.4.7"
|
||||
end
|
||||
|
||||
@@ -3,6 +3,10 @@ require "test_helper"
|
||||
TEST_KEY = "suo_test_key".freeze
|
||||
|
||||
module ClientTests
|
||||
def client(options)
|
||||
@client.class.new(options.merge(client: @client.client))
|
||||
end
|
||||
|
||||
def test_throws_failed_error_on_bad_client
|
||||
assert_raises(Suo::LockClientError) do
|
||||
client = @client.class.new(client: {})
|
||||
@@ -27,6 +31,23 @@ module ClientTests
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_empty_lock_on_invalid_data
|
||||
@client.send(:initial_set, TEST_KEY, "bad value")
|
||||
locked = @client.locked?(TEST_KEY)
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_clear
|
||||
lock1 = @client.lock(TEST_KEY, 1)
|
||||
refute_nil lock1
|
||||
|
||||
@client.clear(TEST_KEY)
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_multiple_resource_locking
|
||||
lock1 = @client.lock(TEST_KEY, 2)
|
||||
refute_nil lock1
|
||||
@@ -88,13 +109,14 @@ module ClientTests
|
||||
|
||||
assert_equal 0, output.size
|
||||
assert_equal %w(One Two), ret
|
||||
assert_equal false, @client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_block_multiple_resource_locking
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = @client.class.new(acquisition_timeout: 0.9, client: @client.client)
|
||||
client = client(acquisition_timeout: 0.9)
|
||||
|
||||
100.times.map do |i|
|
||||
Thread.new do
|
||||
@@ -109,13 +131,14 @@ module ClientTests
|
||||
|
||||
assert_equal 50, success_counter.size
|
||||
assert_equal 50, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_block_multiple_resource_locking_longer_timeout
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = @client.class.new(acquisition_timeout: 3, client: @client.client)
|
||||
client = client(acquisition_timeout: 3)
|
||||
|
||||
100.times.map do |i|
|
||||
Thread.new do
|
||||
@@ -130,6 +153,153 @@ module ClientTests
|
||||
|
||||
assert_equal 100, success_counter.size
|
||||
assert_equal 0, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_unstale_lock_acquisition
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new { client.lock(TEST_KEY) { sleep 0.6; success_counter << 1 } }
|
||||
sleep 0.3
|
||||
t2 = Thread.new do
|
||||
locked = client.lock(TEST_KEY) { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2].map(&:join)
|
||||
|
||||
assert_equal 1, success_counter.size
|
||||
assert_equal 1, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_stale_lock_acquisition
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new { client.lock(TEST_KEY) { sleep 0.6; success_counter << 1 } }
|
||||
sleep 0.55
|
||||
t2 = Thread.new do
|
||||
locked = client.lock(TEST_KEY) { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2].map(&:join)
|
||||
|
||||
assert_equal 2, success_counter.size
|
||||
assert_equal 0, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_refresh
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
lock1 = client.lock(TEST_KEY)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
|
||||
client.refresh(TEST_KEY, lock1)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
|
||||
sleep 0.55
|
||||
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
|
||||
lock2 = client.lock(TEST_KEY)
|
||||
|
||||
client.refresh(TEST_KEY, lock1)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
|
||||
client.unlock(TEST_KEY, lock1)
|
||||
|
||||
# edge case with refresh lock in the middle
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
|
||||
client.clear(TEST_KEY)
|
||||
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
|
||||
client.refresh(TEST_KEY, lock2)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
|
||||
client.unlock(TEST_KEY, lock2)
|
||||
|
||||
# now finally unlocked
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_block_refresh
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new do
|
||||
client.lock(TEST_KEY) do |token|
|
||||
sleep 0.6
|
||||
client.refresh(TEST_KEY, token)
|
||||
sleep 1
|
||||
success_counter << 1
|
||||
end
|
||||
end
|
||||
|
||||
t2 = Thread.new do
|
||||
sleep 0.8
|
||||
locked = client.lock(TEST_KEY) { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2].map(&:join)
|
||||
|
||||
assert_equal 1, success_counter.size
|
||||
assert_equal 1, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
|
||||
def test_refresh_multi
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new do
|
||||
client.lock(TEST_KEY, 2) do |token|
|
||||
sleep 0.4
|
||||
client.refresh(TEST_KEY, token)
|
||||
success_counter << 1
|
||||
sleep 0.5
|
||||
end
|
||||
end
|
||||
|
||||
t2 = Thread.new do
|
||||
sleep 0.55
|
||||
locked = client.lock(TEST_KEY, 2) do
|
||||
success_counter << 1
|
||||
sleep 0.5
|
||||
end
|
||||
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
t3 = Thread.new do
|
||||
sleep 0.75
|
||||
locked = client.lock(TEST_KEY, 2) { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2, t3].map(&:join)
|
||||
|
||||
assert_equal 2, success_counter.size
|
||||
assert_equal 1, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -142,6 +312,18 @@ class TestBaseClient < Minitest::Test
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:get, TEST_KEY)
|
||||
end
|
||||
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:set, TEST_KEY, "", "")
|
||||
end
|
||||
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:initial_set, TEST_KEY)
|
||||
end
|
||||
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:clear, TEST_KEY)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -151,6 +333,7 @@ class TestMemcachedClient < Minitest::Test
|
||||
def setup
|
||||
@dalli = Dalli::Client.new("127.0.0.1:11211")
|
||||
@client = Suo::Client::Memcached.new
|
||||
teardown
|
||||
end
|
||||
|
||||
def teardown
|
||||
@@ -164,6 +347,7 @@ class TestRedisClient < Minitest::Test
|
||||
def setup
|
||||
@redis = Redis.new
|
||||
@client = Suo::Client::Redis.new
|
||||
teardown
|
||||
end
|
||||
|
||||
def teardown
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
|
||||
|
||||
if ENV["CODECLIMATE_REPO_TOKEN"]
|
||||
require "codeclimate-test-reporter"
|
||||
CodeClimate::TestReporter.start
|
||||
end
|
||||
|
||||
require "suo"
|
||||
require "thread"
|
||||
require "minitest/autorun"
|
||||
|
||||
Reference in New Issue
Block a user