mirror of
https://github.com/dkam/suo.git
synced 2025-01-29 07:42:43 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
323caaee9b | ||
|
|
745d49466f | ||
|
|
161d50deb9 | ||
|
|
81e4a3e143 | ||
|
|
2960c14a4d | ||
|
|
308e918e60 | ||
|
|
aaee69a2df | ||
|
|
c6d1c29ada | ||
|
|
14e442e99d | ||
|
|
498073b92e |
10
CHANGELOG.md
10
CHANGELOG.md
@@ -1,3 +1,11 @@
|
||||
## 0.3.0
|
||||
|
||||
- Dramatically simplify the interface by forcing clients to specify the key & resources at lock initialization instead of every method call.
|
||||
|
||||
## 0.2.3
|
||||
|
||||
- Clarify documentation further with respect to semaphores.
|
||||
|
||||
## 0.2.2
|
||||
|
||||
- Fix bug with refresh - typo would've prevented real use.
|
||||
@@ -25,7 +33,7 @@
|
||||
|
||||
## 0.1.1
|
||||
|
||||
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
|
||||
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for lock serialization.
|
||||
|
||||
## 0.1.0
|
||||
|
||||
|
||||
33
README.md
33
README.md
@@ -2,7 +2,7 @@
|
||||
|
||||
:lock: Distributed semaphores using Memcached or Redis in Ruby.
|
||||
|
||||
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis.
|
||||
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis. It allows locking both single exclusion (like a mutex - sharing one resource), as well as multiple resources.
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -18,37 +18,40 @@ gem 'suo'
|
||||
|
||||
```ruby
|
||||
# Memcached
|
||||
suo = Suo::Client::Memcached.new(connection: "127.0.0.1:11211")
|
||||
suo = Suo::Client::Memcached.new("foo_resource", connection: "127.0.0.1:11211")
|
||||
|
||||
# Redis
|
||||
suo = Suo::Client::Redis.new(connection: {host: "10.0.1.1"})
|
||||
suo = Suo::Client::Redis.new("baz_resource", connection: {host: "10.0.1.1"})
|
||||
|
||||
# Pre-existing client
|
||||
suo = Suo::Client::Memcached.new(client: some_dalli_client)
|
||||
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client)
|
||||
|
||||
suo.lock("some_key") do
|
||||
suo.lock do
|
||||
# critical code here
|
||||
@puppies.pet!
|
||||
end
|
||||
|
||||
Thread.new { suo.lock("other_key", 2) { puts "One"; sleep 2 } }
|
||||
Thread.new { suo.lock("other_key", 2) { puts "Two"; sleep 2 } }
|
||||
Thread.new { suo.lock("other_key", 2) { puts "Three" } }
|
||||
# The resources argument is the number of resources the semaphore will allow to lock (defaulting to one - a mutex)
|
||||
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client, resources: 2)
|
||||
|
||||
Thread.new { suo.lock{ puts "One"; sleep 2 } }
|
||||
Thread.new { suo.lock { puts "Two"; sleep 2 } }
|
||||
Thread.new { suo.lock { puts "Three" } }
|
||||
|
||||
# will print "One" "Two", but not "Three", as there are only 2 resources
|
||||
|
||||
# custom acquisition timeouts (time to acquire)
|
||||
suo = Suo::Client::Memcached.new(client: some_dalli_client, acquisition_timeout: 1) # in seconds
|
||||
suo = Suo::Client::Memcached.new("protected_key", client: some_dalli_client, acquisition_timeout: 1) # in seconds
|
||||
|
||||
# manually locking/unlocking
|
||||
# the return value from lock without a block is a unique token valid only for the current lock
|
||||
# which must be unlocked manually
|
||||
lock = suo.lock("a_key")
|
||||
token = suo
|
||||
foo.baz!
|
||||
suo.unlock("a_key", lock)
|
||||
suo.unlock(token)
|
||||
|
||||
# custom stale lock expiration (cleaning of dead locks)
|
||||
suo = Suo::Client::Redis.new(client: some_redis_client, stale_lock_expiration: 60*5)
|
||||
suo = Suo::Client::Redis.new("other_key", client: some_redis_client, stale_lock_expiration: 60*5)
|
||||
```
|
||||
|
||||
### Stale locks
|
||||
@@ -58,13 +61,13 @@ suo = Suo::Client::Redis.new(client: some_redis_client, stale_lock_expiration: 6
|
||||
To re-acquire a lock in the middle of a block, you can use the refresh method on client.
|
||||
|
||||
```ruby
|
||||
suo = Suo::Client::Redis.new
|
||||
suo = Suo::Client::Redis.new("foo")
|
||||
|
||||
# lock is the same token as seen in the manual example, above
|
||||
suo.lock("foo") do |lock|
|
||||
suo.lock do |token|
|
||||
5.times do
|
||||
baz.bar!
|
||||
suo.refresh("foo", lock)
|
||||
suo.refresh(token)
|
||||
end
|
||||
end
|
||||
```
|
||||
|
||||
@@ -4,97 +4,101 @@ module Suo
|
||||
DEFAULT_OPTIONS = {
|
||||
acquisition_timeout: 0.1,
|
||||
acquisition_delay: 0.01,
|
||||
stale_lock_expiration: 3600
|
||||
stale_lock_expiration: 3600,
|
||||
resources: 1
|
||||
}.freeze
|
||||
|
||||
attr_accessor :client
|
||||
attr_accessor :client, :key, :resources, :options
|
||||
|
||||
include MonitorMixin
|
||||
|
||||
def initialize(options = {})
|
||||
def initialize(key, options = {})
|
||||
fail "Client required" unless options[:client]
|
||||
@options = DEFAULT_OPTIONS.merge(options)
|
||||
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
|
||||
@client = @options[:client]
|
||||
super()
|
||||
@resources = @options[:resources].to_i
|
||||
@key = key
|
||||
super() # initialize Monitor mixin for thread safety
|
||||
end
|
||||
|
||||
def lock(key, resources = 1)
|
||||
token = acquire_lock(key, resources)
|
||||
def lock
|
||||
token = acquire_lock
|
||||
|
||||
if block_given? && token
|
||||
begin
|
||||
yield(token)
|
||||
yield
|
||||
ensure
|
||||
unlock(key, token)
|
||||
unlock(token)
|
||||
end
|
||||
else
|
||||
token
|
||||
end
|
||||
end
|
||||
|
||||
def locked?(key, resources = 1)
|
||||
locks(key).size >= resources
|
||||
def locked?
|
||||
locks.size >= resources
|
||||
end
|
||||
|
||||
def locks(key)
|
||||
val, _ = get(key)
|
||||
def locks
|
||||
val, _ = get
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
cleared_locks
|
||||
end
|
||||
|
||||
def refresh(key, acquisition_token)
|
||||
retry_with_timeout(key) do
|
||||
val, cas = get(key)
|
||||
def refresh(token)
|
||||
retry_with_timeout do
|
||||
val, cas = get
|
||||
|
||||
if val.nil?
|
||||
initial_set(key)
|
||||
initial_set
|
||||
next
|
||||
end
|
||||
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
refresh_lock(cleared_locks, acquisition_token)
|
||||
refresh_lock(cleared_locks, token)
|
||||
|
||||
break if set(key, serialize_locks(cleared_locks), cas)
|
||||
break if set(serialize_locks(cleared_locks), cas)
|
||||
end
|
||||
end
|
||||
|
||||
def unlock(key, acquisition_token)
|
||||
return unless acquisition_token
|
||||
def unlock(token)
|
||||
return unless token
|
||||
|
||||
retry_with_timeout(key) do
|
||||
val, cas = get(key)
|
||||
retry_with_timeout do
|
||||
val, cas = get
|
||||
|
||||
break if val.nil?
|
||||
|
||||
cleared_locks = deserialize_and_clear_locks(val)
|
||||
|
||||
acquisition_lock = remove_lock(cleared_locks, acquisition_token)
|
||||
acquisition_lock = remove_lock(cleared_locks, token)
|
||||
|
||||
break unless acquisition_lock
|
||||
break if set(key, serialize_locks(cleared_locks), cas)
|
||||
break if set(serialize_locks(cleared_locks), cas)
|
||||
end
|
||||
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
|
||||
# ignore - assume success due to optimistic locking
|
||||
end
|
||||
|
||||
def clear(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def clear
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def acquire_lock(key, resources = 1)
|
||||
acquisition_token = nil
|
||||
attr_accessor :retry_count
|
||||
|
||||
def acquire_lock
|
||||
token = SecureRandom.base64(16)
|
||||
|
||||
retry_with_timeout(key) do
|
||||
val, cas = get(key)
|
||||
retry_with_timeout do
|
||||
val, cas = get
|
||||
|
||||
if val.nil?
|
||||
initial_set(key)
|
||||
initial_set
|
||||
next
|
||||
end
|
||||
|
||||
@@ -105,44 +109,41 @@ module Suo
|
||||
|
||||
newval = serialize_locks(cleared_locks)
|
||||
|
||||
if set(key, newval, cas)
|
||||
acquisition_token = token
|
||||
break
|
||||
end
|
||||
return token if set(newval, cas)
|
||||
end
|
||||
end
|
||||
|
||||
acquisition_token
|
||||
nil
|
||||
end
|
||||
|
||||
def get(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def get
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def set(key, newval, cas) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def initial_set(key, val = "") # rubocop:disable Lint/UnusedMethodArgument
|
||||
def initial_set(val = "") # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def synchronize
|
||||
mon_synchronize { yield }
|
||||
end
|
||||
|
||||
def retry_with_timeout(key)
|
||||
def retry_with_timeout
|
||||
start = Time.now.to_f
|
||||
|
||||
@retry_count.times do
|
||||
now = Time.now.to_f
|
||||
break if now - start > @options[:acquisition_timeout]
|
||||
retry_count.times do
|
||||
elapsed = Time.now.to_f - start
|
||||
break if elapsed >= options[:acquisition_timeout]
|
||||
|
||||
synchronize(key) do
|
||||
synchronize do
|
||||
yield
|
||||
end
|
||||
|
||||
sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000)
|
||||
sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => _
|
||||
raise LockClientError
|
||||
@@ -167,7 +168,7 @@ module Suo
|
||||
end
|
||||
|
||||
def clear_expired_locks(locks)
|
||||
expired = Time.now - @options[:stale_lock_expiration]
|
||||
expired = Time.now - options[:stale_lock_expiration]
|
||||
locks.reject { |time, _| time < expired }
|
||||
end
|
||||
|
||||
|
||||
@@ -1,27 +1,27 @@
|
||||
module Suo
|
||||
module Client
|
||||
class Memcached < Base
|
||||
def initialize(options = {})
|
||||
def initialize(key, options = {})
|
||||
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
|
||||
super
|
||||
end
|
||||
|
||||
def clear(key)
|
||||
@client.delete(key)
|
||||
def clear
|
||||
@client.delete(@key)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get(key)
|
||||
@client.get_cas(key)
|
||||
def get
|
||||
@client.get_cas(@key)
|
||||
end
|
||||
|
||||
def set(key, newval, cas)
|
||||
@client.set_cas(key, newval, cas)
|
||||
def set(newval, cas)
|
||||
@client.set_cas(@key, newval, cas)
|
||||
end
|
||||
|
||||
def initial_set(key, val = "")
|
||||
@client.set(key, val)
|
||||
def initial_set(val = "")
|
||||
@client.set(@key, val)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,39 +1,39 @@
|
||||
module Suo
|
||||
module Client
|
||||
class Redis < Base
|
||||
def initialize(options = {})
|
||||
def initialize(key, options = {})
|
||||
options[:client] ||= ::Redis.new(options[:connection] || {})
|
||||
super
|
||||
end
|
||||
|
||||
def clear(key)
|
||||
@client.del(key)
|
||||
def clear
|
||||
@client.del(@key)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get(key)
|
||||
[@client.get(key), nil]
|
||||
def get
|
||||
[@client.get(@key), nil]
|
||||
end
|
||||
|
||||
def set(key, newval, _)
|
||||
def set(newval, _)
|
||||
ret = @client.multi do |multi|
|
||||
multi.set(key, newval)
|
||||
multi.set(@key, newval)
|
||||
end
|
||||
|
||||
ret && ret[0] == "OK"
|
||||
end
|
||||
|
||||
def synchronize(key)
|
||||
@client.watch(key) do
|
||||
def synchronize
|
||||
@client.watch(@key) do
|
||||
yield
|
||||
end
|
||||
ensure
|
||||
@client.unwatch
|
||||
end
|
||||
|
||||
def initial_set(key, val = "")
|
||||
@client.set(key, val)
|
||||
def initial_set(val = "")
|
||||
@client.set(@key, val)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
module Suo
|
||||
VERSION = "0.2.2"
|
||||
VERSION = "0.3.0"
|
||||
end
|
||||
|
||||
@@ -9,8 +9,8 @@ Gem::Specification.new do |spec|
|
||||
spec.authors = ["Nick Elser"]
|
||||
spec.email = ["nick.elser@gmail.com"]
|
||||
|
||||
spec.summary = %q(Distributed semaphores using Memcached or Redis.)
|
||||
spec.description = %q(Distributed semaphores using Memcached or Redis.)
|
||||
spec.summary = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
|
||||
spec.description = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
|
||||
spec.homepage = "https://github.com/nickelser/suo"
|
||||
spec.license = "MIT"
|
||||
|
||||
|
||||
@@ -3,249 +3,247 @@ require "test_helper"
|
||||
TEST_KEY = "suo_test_key".freeze
|
||||
|
||||
module ClientTests
|
||||
def client(options)
|
||||
@client.class.new(options.merge(client: @client.client))
|
||||
def client(options = {})
|
||||
@client.class.new(options[:key] || TEST_KEY, options.merge(client: @client.client))
|
||||
end
|
||||
|
||||
def test_throws_failed_error_on_bad_client
|
||||
assert_raises(Suo::LockClientError) do
|
||||
client = @client.class.new(client: {})
|
||||
client.lock(TEST_KEY, 1)
|
||||
client = @client.class.new(TEST_KEY, client: {})
|
||||
client.lock
|
||||
end
|
||||
end
|
||||
|
||||
def test_single_resource_locking
|
||||
lock1 = @client.lock(TEST_KEY, 1)
|
||||
lock1 = @client.lock
|
||||
refute_nil lock1
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
locked = @client.locked?
|
||||
assert_equal true, locked
|
||||
|
||||
lock2 = @client.lock(TEST_KEY, 1)
|
||||
lock2 = @client.lock
|
||||
assert_nil lock2
|
||||
|
||||
@client.unlock(TEST_KEY, lock1)
|
||||
@client.unlock(lock1)
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
locked = @client.locked?
|
||||
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_empty_lock_on_invalid_data
|
||||
@client.send(:initial_set, TEST_KEY, "bad value")
|
||||
locked = @client.locked?(TEST_KEY)
|
||||
assert_equal false, locked
|
||||
@client.send(:initial_set, "bad value")
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_clear
|
||||
lock1 = @client.lock(TEST_KEY, 1)
|
||||
lock1 = @client.lock
|
||||
refute_nil lock1
|
||||
|
||||
@client.clear(TEST_KEY)
|
||||
@client.clear
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
|
||||
assert_equal false, locked
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_multiple_resource_locking
|
||||
lock1 = @client.lock(TEST_KEY, 2)
|
||||
@client = client(resources: 2)
|
||||
|
||||
lock1 = @client.lock
|
||||
refute_nil lock1
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 2)
|
||||
assert_equal false, locked
|
||||
assert_equal false, @client.locked?
|
||||
|
||||
lock2 = @client.lock(TEST_KEY, 2)
|
||||
lock2 = @client.lock
|
||||
refute_nil lock2
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 2)
|
||||
assert_equal true, locked
|
||||
assert_equal true, @client.locked?
|
||||
|
||||
@client.unlock(TEST_KEY, lock1)
|
||||
@client.unlock(lock1)
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal true, locked
|
||||
assert_equal false, @client.locked?
|
||||
|
||||
@client.unlock(TEST_KEY, lock2)
|
||||
assert_equal 1, @client.locks.size
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal false, locked
|
||||
@client.unlock(lock2)
|
||||
|
||||
assert_equal false, @client.locked?
|
||||
assert_equal 0, @client.locks.size
|
||||
end
|
||||
|
||||
def test_block_single_resource_locking
|
||||
locked = false
|
||||
|
||||
@client.lock(TEST_KEY, 1) { locked = true }
|
||||
@client.lock { locked = true }
|
||||
|
||||
assert_equal true, locked
|
||||
end
|
||||
|
||||
def test_block_unlocks_on_exception
|
||||
assert_raises(RuntimeError) do
|
||||
@client.lock(TEST_KEY, 1) { fail "Test" }
|
||||
@client.lock{ fail "Test" }
|
||||
end
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal false, locked
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_readme_example
|
||||
output = Queue.new
|
||||
@client = client(resources: 2)
|
||||
threads = []
|
||||
|
||||
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "One"; sleep 0.5 } }
|
||||
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Two"; sleep 0.5 } }
|
||||
threads << Thread.new { @client.lock { output << "One"; sleep 0.5 } }
|
||||
threads << Thread.new { @client.lock { output << "Two"; sleep 0.5 } }
|
||||
sleep 0.1
|
||||
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Three" } }
|
||||
threads << Thread.new { @client.lock { output << "Three" } }
|
||||
|
||||
threads.map(&:join)
|
||||
threads.each(&:join)
|
||||
|
||||
ret = []
|
||||
|
||||
ret << output.pop
|
||||
ret << output.pop
|
||||
ret << (output.size > 0 ? output.pop : nil)
|
||||
ret << (output.size > 0 ? output.pop : nil)
|
||||
|
||||
ret.sort!
|
||||
|
||||
assert_equal 0, output.size
|
||||
assert_equal %w(One Two), ret
|
||||
assert_equal false, @client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_block_multiple_resource_locking
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(acquisition_timeout: 0.9)
|
||||
@client = client(acquisition_timeout: 0.9, resources: 50)
|
||||
|
||||
100.times.map do |i|
|
||||
Thread.new do
|
||||
success = client.lock(TEST_KEY, 50) do
|
||||
success = @client.lock do
|
||||
sleep(3)
|
||||
success_counter << i
|
||||
end
|
||||
|
||||
failure_counter << i unless success
|
||||
end
|
||||
end.map(&:join)
|
||||
end.each(&:join)
|
||||
|
||||
assert_equal 50, success_counter.size
|
||||
assert_equal 50, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_block_multiple_resource_locking_longer_timeout
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(acquisition_timeout: 3)
|
||||
@client = client(acquisition_timeout: 3, resources: 50)
|
||||
|
||||
100.times.map do |i|
|
||||
Thread.new do
|
||||
success = client.lock(TEST_KEY, 50) do
|
||||
success = @client.lock do
|
||||
sleep(0.5)
|
||||
success_counter << i
|
||||
end
|
||||
|
||||
failure_counter << i unless success
|
||||
end
|
||||
end.map(&:join)
|
||||
end.each(&:join)
|
||||
|
||||
assert_equal 100, success_counter.size
|
||||
assert_equal 0, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_unstale_lock_acquisition
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
@client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new { client.lock(TEST_KEY) { sleep 0.6; success_counter << 1 } }
|
||||
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
|
||||
sleep 0.3
|
||||
t2 = Thread.new do
|
||||
locked = client.lock(TEST_KEY) { success_counter << 1 }
|
||||
locked = @client.lock { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2].map(&:join)
|
||||
[t1, t2].each(&:join)
|
||||
|
||||
assert_equal 1, success_counter.size
|
||||
assert_equal 1, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_stale_lock_acquisition
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
@client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new { client.lock(TEST_KEY) { sleep 0.6; success_counter << 1 } }
|
||||
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
|
||||
sleep 0.55
|
||||
t2 = Thread.new do
|
||||
locked = client.lock(TEST_KEY) { success_counter << 1 }
|
||||
locked = @client.lock { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2].map(&:join)
|
||||
[t1, t2].each(&:join)
|
||||
|
||||
assert_equal 2, success_counter.size
|
||||
assert_equal 0, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_refresh
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
@client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
lock1 = client.lock(TEST_KEY)
|
||||
lock1 = @client.lock
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
assert_equal true, @client.locked?
|
||||
|
||||
client.refresh(TEST_KEY, lock1)
|
||||
@client.refresh(lock1)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
assert_equal true, @client.locked?
|
||||
|
||||
sleep 0.55
|
||||
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
|
||||
lock2 = client.lock(TEST_KEY)
|
||||
lock2 = @client.lock
|
||||
|
||||
client.refresh(TEST_KEY, lock1)
|
||||
@client.refresh(lock1)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
assert_equal true, @client.locked?
|
||||
|
||||
client.unlock(TEST_KEY, lock1)
|
||||
@client.unlock(lock1)
|
||||
|
||||
# edge case with refresh lock in the middle
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
assert_equal true, @client.locked?
|
||||
|
||||
client.clear(TEST_KEY)
|
||||
@client.clear
|
||||
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
|
||||
client.refresh(TEST_KEY, lock2)
|
||||
@client.refresh(lock2)
|
||||
|
||||
assert_equal true, client.locked?(TEST_KEY)
|
||||
assert_equal true, @client.locked?
|
||||
|
||||
client.unlock(TEST_KEY, lock2)
|
||||
@client.unlock(lock2)
|
||||
|
||||
# now finally unlocked
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_block_refresh
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
@client = client(stale_lock_expiration: 0.5)
|
||||
|
||||
t1 = Thread.new do
|
||||
client.lock(TEST_KEY) do |token|
|
||||
@client.lock do |token|
|
||||
sleep 0.6
|
||||
client.refresh(TEST_KEY, token)
|
||||
@client.refresh(token)
|
||||
sleep 1
|
||||
success_counter << 1
|
||||
end
|
||||
@@ -253,27 +251,27 @@ module ClientTests
|
||||
|
||||
t2 = Thread.new do
|
||||
sleep 0.8
|
||||
locked = client.lock(TEST_KEY) { success_counter << 1 }
|
||||
locked = @client.lock { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2].map(&:join)
|
||||
[t1, t2].each(&:join)
|
||||
|
||||
assert_equal 1, success_counter.size
|
||||
assert_equal 1, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_refresh_multi
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = client(stale_lock_expiration: 0.5)
|
||||
@client = client(stale_lock_expiration: 0.5, resources: 2)
|
||||
|
||||
t1 = Thread.new do
|
||||
client.lock(TEST_KEY, 2) do |token|
|
||||
@client.lock do |token|
|
||||
sleep 0.4
|
||||
client.refresh(TEST_KEY, token)
|
||||
@client.refresh(token)
|
||||
success_counter << 1
|
||||
sleep 0.5
|
||||
end
|
||||
@@ -281,7 +279,7 @@ module ClientTests
|
||||
|
||||
t2 = Thread.new do
|
||||
sleep 0.55
|
||||
locked = client.lock(TEST_KEY, 2) do
|
||||
locked = @client.lock do
|
||||
success_counter << 1
|
||||
sleep 0.5
|
||||
end
|
||||
@@ -291,38 +289,69 @@ module ClientTests
|
||||
|
||||
t3 = Thread.new do
|
||||
sleep 0.75
|
||||
locked = client.lock(TEST_KEY, 2) { success_counter << 1 }
|
||||
locked = @client.lock { success_counter << 1 }
|
||||
failure_counter << 1 unless locked
|
||||
end
|
||||
|
||||
[t1, t2, t3].map(&:join)
|
||||
[t1, t2, t3].each(&:join)
|
||||
|
||||
assert_equal 2, success_counter.size
|
||||
assert_equal 1, failure_counter.size
|
||||
assert_equal false, client.locked?(TEST_KEY)
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_increment_reused_client
|
||||
i = 0
|
||||
|
||||
threads = 2.times.map do
|
||||
Thread.new do
|
||||
@client.lock { i += 1 }
|
||||
end
|
||||
end
|
||||
|
||||
threads.each(&:join)
|
||||
|
||||
assert_equal 2, i
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
|
||||
def test_increment_new_client
|
||||
i = 0
|
||||
|
||||
threads = 2.times.map do
|
||||
Thread.new do
|
||||
# note this is the method that generates a *new* client
|
||||
client.lock { i += 1 }
|
||||
end
|
||||
end
|
||||
|
||||
threads.each(&:join)
|
||||
|
||||
assert_equal 2, i
|
||||
assert_equal false, @client.locked?
|
||||
end
|
||||
end
|
||||
|
||||
class TestBaseClient < Minitest::Test
|
||||
def setup
|
||||
@client = Suo::Client::Base.new(client: {})
|
||||
@client = Suo::Client::Base.new(TEST_KEY, client: {})
|
||||
end
|
||||
|
||||
def test_not_implemented
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:get, TEST_KEY)
|
||||
@client.send(:get)
|
||||
end
|
||||
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:set, TEST_KEY, "", "")
|
||||
@client.send(:set, "", "")
|
||||
end
|
||||
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:initial_set, TEST_KEY)
|
||||
@client.send(:initial_set)
|
||||
end
|
||||
|
||||
assert_raises(NotImplementedError) do
|
||||
@client.send(:clear, TEST_KEY)
|
||||
@client.send(:clear)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -332,7 +361,7 @@ class TestMemcachedClient < Minitest::Test
|
||||
|
||||
def setup
|
||||
@dalli = Dalli::Client.new("127.0.0.1:11211")
|
||||
@client = Suo::Client::Memcached.new
|
||||
@client = Suo::Client::Memcached.new(TEST_KEY)
|
||||
teardown
|
||||
end
|
||||
|
||||
@@ -346,7 +375,7 @@ class TestRedisClient < Minitest::Test
|
||||
|
||||
def setup
|
||||
@redis = Redis.new
|
||||
@client = Suo::Client::Redis.new
|
||||
@client = Suo::Client::Redis.new(TEST_KEY)
|
||||
teardown
|
||||
end
|
||||
|
||||
|
||||
Reference in New Issue
Block a user