mirror of
https://github.com/dkam/suo.git
synced 2025-01-29 07:42:43 +00:00
Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
857fc63378 | ||
|
|
bb6762bbc6 | ||
|
|
f0977c89f2 | ||
|
|
4d5c96309f | ||
|
|
8d6061b137 | ||
|
|
ce0a4d8d86 | ||
|
|
1fd769eec2 | ||
|
|
37be5ae27b | ||
|
|
a1a226fb59 | ||
|
|
8d7ddaf35a | ||
|
|
1aacc0c1a1 | ||
|
|
8166c6b51d | ||
|
|
7662743123 | ||
|
|
a2fa281b86 | ||
|
|
887219b63d | ||
|
|
c0905fef91 | ||
|
|
a54b795e20 | ||
|
|
0828cd546b | ||
|
|
c2b9de4cf3 | ||
|
|
d4860423fa |
19
CHANGELOG.md
19
CHANGELOG.md
@@ -1,8 +1,25 @@
|
||||
## 0.2.1
|
||||
|
||||
- Fix bug when dealing with real-world Redis error conditions.
|
||||
|
||||
## 0.2.0
|
||||
|
||||
- Refactor class methods into instance methods to simplify implementation.
|
||||
- Increase thread safety with Memcached implementation.
|
||||
|
||||
## 0.1.3
|
||||
|
||||
- Properly throw Suo::LockClientError when the connection itself fails (Memcache server not reachable, etc.)
|
||||
|
||||
## 0.1.2
|
||||
|
||||
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
|
||||
- Refactor client implementations to re-use more code.
|
||||
|
||||
## 0.1.1
|
||||
|
||||
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
|
||||
|
||||
|
||||
## 0.1.0
|
||||
|
||||
- First release.
|
||||
|
||||
25
README.md
25
README.md
@@ -1,4 +1,4 @@
|
||||
# Suo [](https://travis-ci.org/nickelser/suo)
|
||||
# Suo [](https://travis-ci.org/nickelser/suo) [](http://badge.fury.io/rb/suo)
|
||||
|
||||
:lock: Distributed semaphores using Memcached or Redis in Ruby.
|
||||
|
||||
@@ -31,18 +31,27 @@ suo.lock("some_key") do
|
||||
@puppies.pet!
|
||||
end
|
||||
|
||||
2.times do
|
||||
Thread.new do
|
||||
# second argument is the number of resources - so this will run twice
|
||||
suo.lock("other_key", 2, timeout: 0.5) { puts "Will run twice!" }
|
||||
end
|
||||
end
|
||||
Thread.new { suo.lock("other_key", 2) { puts "One"; sleep 2 } }
|
||||
Thread.new { suo.lock("other_key", 2) { puts "Two"; sleep 2 } }
|
||||
Thread.new { suo.lock("other_key", 2) { puts "Three" } }
|
||||
|
||||
# will print "One" "Two", but not "Three", as there are only 2 resources
|
||||
|
||||
# custom acquisition timeouts (time to acquire)
|
||||
suo = Suo::Client::Memcached.new(client: some_dalli_client, acquisition_timeout: 1) # in seconds
|
||||
|
||||
# manually locking/unlocking
|
||||
suo.lock("a_key")
|
||||
foo.baz!
|
||||
suo.unlock("a_key")
|
||||
|
||||
# custom stale lock cleanup (cleaning of dead clients)
|
||||
suo = Suo::Client::Redis.new(client: some_redis_client, stale_lock_expiration: 60*5)
|
||||
```
|
||||
|
||||
## TODO
|
||||
- better stale key handling (refresh blocks)
|
||||
- more race condition tests
|
||||
- refactor clients to re-use more code
|
||||
|
||||
## History
|
||||
|
||||
|
||||
16
lib/suo.rb
16
lib/suo.rb
@@ -1,2 +1,16 @@
|
||||
require "securerandom"
|
||||
require "monitor"
|
||||
|
||||
require "dalli"
|
||||
require "dalli/cas/client"
|
||||
|
||||
require "redis"
|
||||
|
||||
require "msgpack"
|
||||
|
||||
require "suo/version"
|
||||
require "suo/clients"
|
||||
|
||||
require "suo/errors"
|
||||
require "suo/client/base"
|
||||
require "suo/client/memcached"
|
||||
require "suo/client/redis"
|
||||
|
||||
@@ -2,99 +2,172 @@ module Suo
|
||||
module Client
|
||||
class Base
|
||||
DEFAULT_OPTIONS = {
|
||||
retry_count: 3,
|
||||
retry_delay: 0.01,
|
||||
acquisition_timeout: 0.1,
|
||||
acquisition_delay: 0.01,
|
||||
stale_lock_expiration: 3600
|
||||
}.freeze
|
||||
|
||||
attr_accessor :client
|
||||
|
||||
include MonitorMixin
|
||||
|
||||
def initialize(options = {})
|
||||
@options = self.class.merge_defaults(options).merge(_initialized: true)
|
||||
fail "Client required" unless options[:client]
|
||||
@options = DEFAULT_OPTIONS.merge(options)
|
||||
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
|
||||
@client = @options[:client]
|
||||
super()
|
||||
end
|
||||
|
||||
def lock(key, resources = 1, options = {})
|
||||
options = self.class.merge_defaults(@options.merge(options))
|
||||
token = self.class.lock(key, resources, options)
|
||||
def lock(key, resources = 1)
|
||||
token = acquire_lock(key, resources)
|
||||
|
||||
if token
|
||||
if block_given? && token
|
||||
begin
|
||||
yield if block_given?
|
||||
yield
|
||||
ensure
|
||||
self.class.unlock(key, token, options)
|
||||
unlock(key, token)
|
||||
end
|
||||
|
||||
true
|
||||
else
|
||||
false
|
||||
token
|
||||
end
|
||||
end
|
||||
|
||||
def locked?(key, resources = 1)
|
||||
self.class.locked?(key, resources, @options)
|
||||
locks(key).size >= resources
|
||||
end
|
||||
|
||||
class << self
|
||||
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
||||
def locks(key)
|
||||
val, _ = get(key)
|
||||
locks = deserialize_locks(val)
|
||||
|
||||
locks
|
||||
end
|
||||
|
||||
def refresh(key, acquisition_token)
|
||||
retry_with_timeout(key) do
|
||||
val, cas = get(key)
|
||||
|
||||
if val.nil?
|
||||
initial_set(key)
|
||||
next
|
||||
end
|
||||
|
||||
locks = deserialize_and_clear_locks(val)
|
||||
|
||||
refresh_lock(locks, acquisition_token)
|
||||
|
||||
break if set(key, serialize_locks(locks), cas)
|
||||
end
|
||||
end
|
||||
|
||||
def unlock(key, acquisition_token)
|
||||
return unless acquisition_token
|
||||
|
||||
retry_with_timeout(key) do
|
||||
val, cas = get(key)
|
||||
|
||||
break if val.nil?
|
||||
|
||||
locks = deserialize_and_clear_locks(val)
|
||||
|
||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
||||
|
||||
break unless acquisition_lock
|
||||
break if set(key, serialize_locks(locks), cas)
|
||||
end
|
||||
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
|
||||
# ignore - assume success due to optimistic locking
|
||||
end
|
||||
|
||||
def clear(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def locked?(key, resources = 1, options = {})
|
||||
options = merge_defaults(options)
|
||||
client = options[:client]
|
||||
locks = deserialize_locks(client.get(key))
|
||||
|
||||
locks.size >= resources
|
||||
end
|
||||
|
||||
def locks(key, options)
|
||||
options = merge_defaults(options)
|
||||
client = options[:client]
|
||||
locks = deserialize_locks(client.get(key))
|
||||
|
||||
locks.size
|
||||
end
|
||||
|
||||
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def merge_defaults(options = {})
|
||||
unless options[:_initialized]
|
||||
options = self::DEFAULT_OPTIONS.merge(options)
|
||||
|
||||
fail "Client required" unless options[:client]
|
||||
end
|
||||
|
||||
if options[:retry_timeout]
|
||||
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
|
||||
end
|
||||
|
||||
options
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def acquire_lock(key, resources = 1)
|
||||
acquisition_token = nil
|
||||
token = SecureRandom.base64(16)
|
||||
|
||||
retry_with_timeout(key) do
|
||||
val, cas = get(key)
|
||||
|
||||
if val.nil?
|
||||
initial_set(key)
|
||||
next
|
||||
end
|
||||
|
||||
locks = deserialize_and_clear_locks(val)
|
||||
|
||||
if locks.size < resources
|
||||
add_lock(locks, token)
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
if set(key, newval, cas)
|
||||
acquisition_token = token
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
acquisition_token
|
||||
end
|
||||
|
||||
def get(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def set(key, newval, oldval) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def initial_set(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
fail NotImplementedError
|
||||
end
|
||||
|
||||
def synchronize(key) # rubocop:disable Lint/UnusedMethodArgument
|
||||
mon_synchronize { yield }
|
||||
end
|
||||
|
||||
def retry_with_timeout(key)
|
||||
start = Time.now.to_f
|
||||
|
||||
@retry_count.times do
|
||||
now = Time.now.to_f
|
||||
break if now - start > @options[:acquisition_timeout]
|
||||
|
||||
synchronize(key) do
|
||||
yield
|
||||
end
|
||||
|
||||
sleep(rand(@options[:acquisition_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => _
|
||||
raise LockClientError
|
||||
end
|
||||
|
||||
def serialize_locks(locks)
|
||||
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
|
||||
end
|
||||
|
||||
def deserialize_and_clear_locks(val)
|
||||
clear_expired_locks(deserialize_locks(val))
|
||||
end
|
||||
|
||||
def deserialize_locks(val)
|
||||
MessagePack.unpack(val).map do |time, token|
|
||||
unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val)
|
||||
|
||||
unpacked.map do |time, token|
|
||||
[Time.at(time), token]
|
||||
end
|
||||
rescue EOFError => _
|
||||
[]
|
||||
end
|
||||
|
||||
def clear_expired_locks(locks, options)
|
||||
expired = Time.now - options[:stale_lock_expiration]
|
||||
def clear_expired_locks(locks)
|
||||
expired = Time.now - @options[:stale_lock_expiration]
|
||||
locks.reject { |time, _| time < expired }
|
||||
end
|
||||
|
||||
@@ -114,4 +187,3 @@ module Suo
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
module Suo
|
||||
module Client
|
||||
module Errors
|
||||
class FailedToAcquireLock < StandardError; end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -6,131 +6,22 @@ module Suo
|
||||
super
|
||||
end
|
||||
|
||||
class << self
|
||||
def lock(key, resources = 1, options = {})
|
||||
options = merge_defaults(options)
|
||||
acquisition_token = nil
|
||||
token = SecureRandom.base64(16)
|
||||
client = options[:client]
|
||||
|
||||
begin
|
||||
start = Time.now.to_f
|
||||
|
||||
options[:retry_count].times do
|
||||
if options[:retry_timeout]
|
||||
now = Time.now.to_f
|
||||
break if now - start > options[:retry_timeout]
|
||||
def clear(key)
|
||||
@client.delete(key)
|
||||
end
|
||||
|
||||
val, cas = client.get_cas(key)
|
||||
private
|
||||
|
||||
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
|
||||
if val.nil?
|
||||
client.set(key, "")
|
||||
next
|
||||
def get(key)
|
||||
@client.get_cas(key)
|
||||
end
|
||||
|
||||
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
|
||||
|
||||
if locks.size < resources
|
||||
add_lock(locks, token)
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
if client.set_cas(key, newval, cas)
|
||||
acquisition_token = token
|
||||
break
|
||||
end
|
||||
def set(key, newval, cas)
|
||||
@client.set_cas(key, newval, cas)
|
||||
end
|
||||
|
||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => _
|
||||
raise FailedToAcquireLock
|
||||
end
|
||||
|
||||
acquisition_token
|
||||
end
|
||||
|
||||
def refresh(key, acquisition_token, options = {})
|
||||
options = merge_defaults(options)
|
||||
client = options[:client]
|
||||
|
||||
begin
|
||||
start = Time.now.to_f
|
||||
|
||||
options[:retry_count].times do
|
||||
if options[:retry_timeout]
|
||||
now = Time.now.to_f
|
||||
break if now - start > options[:retry_timeout]
|
||||
end
|
||||
|
||||
val, cas = client.get_cas(key)
|
||||
|
||||
# much like with initial set - ensure the key is here
|
||||
if val.nil?
|
||||
client.set(key, "")
|
||||
next
|
||||
end
|
||||
|
||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
||||
|
||||
refresh_lock(locks, acquisition_token)
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
break if client.set_cas(key, newval, cas)
|
||||
|
||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => _
|
||||
raise FailedToAcquireLock
|
||||
end
|
||||
end
|
||||
|
||||
def unlock(key, acquisition_token, options = {})
|
||||
options = merge_defaults(options)
|
||||
client = options[:client]
|
||||
|
||||
return unless acquisition_token
|
||||
|
||||
begin
|
||||
start = Time.now.to_f
|
||||
|
||||
options[:retry_count].times do
|
||||
if options[:retry_timeout]
|
||||
now = Time.now.to_f
|
||||
break if now - start > options[:retry_timeout]
|
||||
end
|
||||
|
||||
val, cas = client.get_cas(key)
|
||||
|
||||
break if val.nil? # lock has expired totally
|
||||
|
||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
||||
|
||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
||||
|
||||
break unless acquisition_lock
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
break if client.set_cas(key, newval, cas)
|
||||
|
||||
# another client cleared a token in the interim - try again!
|
||||
|
||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => boom # rubocop:disable Lint/HandleExceptions
|
||||
# since it's optimistic locking - fine if we are unable to release
|
||||
raise boom if ENV["SUO_TEST"]
|
||||
end
|
||||
end
|
||||
|
||||
def clear(key, options = {})
|
||||
options = merge_defaults(options)
|
||||
options[:client].delete(key)
|
||||
end
|
||||
def initial_set(key)
|
||||
@client.set(key, "")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -6,160 +6,34 @@ module Suo
|
||||
super
|
||||
end
|
||||
|
||||
class << self
|
||||
def lock(key, resources = 1, options = {})
|
||||
options = merge_defaults(options)
|
||||
acquisition_token = nil
|
||||
token = SecureRandom.base64(16)
|
||||
client = options[:client]
|
||||
|
||||
begin
|
||||
start = Time.now.to_f
|
||||
|
||||
options[:retry_count].times do
|
||||
if options[:retry_timeout]
|
||||
now = Time.now.to_f
|
||||
break if now - start > options[:retry_timeout]
|
||||
def clear(key)
|
||||
@client.del(key)
|
||||
end
|
||||
|
||||
client.watch(key) do
|
||||
begin
|
||||
val = client.get(key)
|
||||
private
|
||||
|
||||
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
|
||||
def get(key)
|
||||
[@client.get(key), nil]
|
||||
end
|
||||
|
||||
if locks.size < resources
|
||||
add_lock(locks, token)
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
ret = client.multi do |multi|
|
||||
def set(key, newval, _)
|
||||
ret = @client.multi do |multi|
|
||||
multi.set(key, newval)
|
||||
end
|
||||
|
||||
acquisition_token = token if ret[0] == "OK"
|
||||
ret && ret[0] == "OK"
|
||||
end
|
||||
|
||||
def synchronize(key)
|
||||
@client.watch(key) do
|
||||
yield
|
||||
end
|
||||
ensure
|
||||
client.unwatch
|
||||
end
|
||||
@client.unwatch
|
||||
end
|
||||
|
||||
break if acquisition_token
|
||||
|
||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => _
|
||||
raise Suo::Client::FailedToAcquireLock
|
||||
end
|
||||
|
||||
acquisition_token
|
||||
end
|
||||
|
||||
def refresh(key, acquisition_token, options = {})
|
||||
options = merge_defaults(options)
|
||||
client = options[:client]
|
||||
refreshed = false
|
||||
|
||||
begin
|
||||
start = Time.now.to_f
|
||||
|
||||
options[:retry_count].times do
|
||||
if options[:retry_timeout]
|
||||
now = Time.now.to_f
|
||||
break if now - start > options[:retry_timeout]
|
||||
end
|
||||
|
||||
client.watch(key) do
|
||||
begin
|
||||
val = client.get(key)
|
||||
|
||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
||||
|
||||
refresh_lock(locks, acquisition_token)
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
ret = client.multi do |multi|
|
||||
multi.set(key, newval)
|
||||
end
|
||||
|
||||
refreshed = ret[0] == "OK"
|
||||
ensure
|
||||
client.unwatch
|
||||
end
|
||||
end
|
||||
|
||||
break if refreshed
|
||||
|
||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => _
|
||||
raise Suo::Client::FailedToAcquireLock
|
||||
end
|
||||
end
|
||||
|
||||
def unlock(key, acquisition_token, options = {})
|
||||
options = merge_defaults(options)
|
||||
client = options[:client]
|
||||
|
||||
return unless acquisition_token
|
||||
|
||||
begin
|
||||
start = Time.now.to_f
|
||||
|
||||
options[:retry_count].times do
|
||||
cleared = false
|
||||
|
||||
if options[:retry_timeout]
|
||||
now = Time.now.to_f
|
||||
break if now - start > options[:retry_timeout]
|
||||
end
|
||||
|
||||
client.watch(key) do
|
||||
begin
|
||||
val = client.get(key)
|
||||
|
||||
if val.nil?
|
||||
cleared = true
|
||||
break
|
||||
end
|
||||
|
||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
||||
|
||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
||||
|
||||
unless acquisition_lock
|
||||
# token was already cleared
|
||||
cleared = true
|
||||
break
|
||||
end
|
||||
|
||||
newval = serialize_locks(locks)
|
||||
|
||||
ret = client.multi do |multi|
|
||||
multi.set(key, newval)
|
||||
end
|
||||
|
||||
cleared = ret[0] == "OK"
|
||||
ensure
|
||||
client.unwatch
|
||||
end
|
||||
end
|
||||
|
||||
break if cleared
|
||||
|
||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||
end
|
||||
rescue => boom # rubocop:disable Lint/HandleExceptions
|
||||
# since it's optimistic locking - fine if we are unable to release
|
||||
raise boom if ENV["SUO_TEST"]
|
||||
end
|
||||
end
|
||||
|
||||
def clear(key, options = {})
|
||||
options = merge_defaults(options)
|
||||
options[:client].del(key)
|
||||
end
|
||||
def initial_set(key)
|
||||
@client.set(key, "")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
require "securerandom"
|
||||
require "monitor"
|
||||
|
||||
require "dalli"
|
||||
require "dalli/cas/client"
|
||||
|
||||
require "redis"
|
||||
|
||||
require "msgpack"
|
||||
|
||||
require "suo/client/errors"
|
||||
require "suo/client/base"
|
||||
require "suo/client/memcached"
|
||||
require "suo/client/redis"
|
||||
3
lib/suo/errors.rb
Normal file
3
lib/suo/errors.rb
Normal file
@@ -0,0 +1,3 @@
|
||||
module Suo
|
||||
class LockClientError < StandardError; end
|
||||
end
|
||||
@@ -1,3 +1,3 @@
|
||||
module Suo
|
||||
VERSION = "0.1.1"
|
||||
VERSION = "0.2.1"
|
||||
end
|
||||
|
||||
@@ -20,6 +20,8 @@ Gem::Specification.new do |spec|
|
||||
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
|
||||
spec.require_paths = ["lib"]
|
||||
|
||||
spec.required_ruby_version = "~> 2.0"
|
||||
|
||||
spec.add_dependency "dalli"
|
||||
spec.add_dependency "redis"
|
||||
spec.add_dependency "msgpack"
|
||||
|
||||
@@ -3,55 +3,55 @@ require "test_helper"
|
||||
TEST_KEY = "suo_test_key".freeze
|
||||
|
||||
module ClientTests
|
||||
def test_requires_client
|
||||
exception = assert_raises(RuntimeError) do
|
||||
@klass.lock(TEST_KEY, 1)
|
||||
def test_throws_failed_error_on_bad_client
|
||||
assert_raises(Suo::LockClientError) do
|
||||
client = @client.class.new(client: {})
|
||||
client.lock(TEST_KEY, 1)
|
||||
end
|
||||
end
|
||||
|
||||
assert_equal "Client required", exception.message
|
||||
end
|
||||
|
||||
def test_class_single_resource_locking
|
||||
lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client)
|
||||
def test_single_resource_locking
|
||||
lock1 = @client.lock(TEST_KEY, 1)
|
||||
refute_nil lock1
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal true, locked
|
||||
|
||||
lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client)
|
||||
lock2 = @client.lock(TEST_KEY, 1)
|
||||
assert_nil lock2
|
||||
|
||||
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
|
||||
@client.unlock(TEST_KEY, lock1)
|
||||
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_class_multiple_resource_locking
|
||||
lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client)
|
||||
def test_multiple_resource_locking
|
||||
lock1 = @client.lock(TEST_KEY, 2)
|
||||
refute_nil lock1
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
|
||||
locked = @client.locked?(TEST_KEY, 2)
|
||||
assert_equal false, locked
|
||||
|
||||
lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client)
|
||||
lock2 = @client.lock(TEST_KEY, 2)
|
||||
refute_nil lock2
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
|
||||
locked = @client.locked?(TEST_KEY, 2)
|
||||
assert_equal true, locked
|
||||
|
||||
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
|
||||
@client.unlock(TEST_KEY, lock1)
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal true, locked
|
||||
|
||||
@klass.unlock(TEST_KEY, lock2, client: @klass_client)
|
||||
@client.unlock(TEST_KEY, lock2)
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_instance_single_resource_locking
|
||||
def test_block_single_resource_locking
|
||||
locked = false
|
||||
|
||||
@client.lock(TEST_KEY, 1) { locked = true }
|
||||
@@ -59,23 +59,47 @@ module ClientTests
|
||||
assert_equal true, locked
|
||||
end
|
||||
|
||||
def test_instance_unlocks_on_exception
|
||||
def test_block_unlocks_on_exception
|
||||
assert_raises(RuntimeError) do
|
||||
@client.lock(TEST_KEY, 1) { fail "Test" }
|
||||
end
|
||||
|
||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
||||
locked = @client.locked?(TEST_KEY, 1)
|
||||
assert_equal false, locked
|
||||
end
|
||||
|
||||
def test_instance_multiple_resource_locking
|
||||
def test_readme_example
|
||||
output = Queue.new
|
||||
threads = []
|
||||
|
||||
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "One"; sleep 0.5 } }
|
||||
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Two"; sleep 0.5 } }
|
||||
sleep 0.1
|
||||
threads << Thread.new { @client.lock(TEST_KEY, 2) { output << "Three" } }
|
||||
|
||||
threads.map(&:join)
|
||||
|
||||
ret = []
|
||||
|
||||
ret << output.pop
|
||||
ret << output.pop
|
||||
|
||||
ret.sort!
|
||||
|
||||
assert_equal 0, output.size
|
||||
assert_equal %w(One Two), ret
|
||||
end
|
||||
|
||||
def test_block_multiple_resource_locking
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = @client.class.new(acquisition_timeout: 0.9, client: @client.client)
|
||||
|
||||
100.times.map do |i|
|
||||
Thread.new do
|
||||
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do
|
||||
sleep(1)
|
||||
success = client.lock(TEST_KEY, 50) do
|
||||
sleep(3)
|
||||
success_counter << i
|
||||
end
|
||||
|
||||
@@ -87,14 +111,16 @@ module ClientTests
|
||||
assert_equal 50, failure_counter.size
|
||||
end
|
||||
|
||||
def test_instance_multiple_resource_locking_longer_timeout
|
||||
def test_block_multiple_resource_locking_longer_timeout
|
||||
success_counter = Queue.new
|
||||
failure_counter = Queue.new
|
||||
|
||||
client = @client.class.new(acquisition_timeout: 3, client: @client.client)
|
||||
|
||||
100.times.map do |i|
|
||||
Thread.new do
|
||||
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do
|
||||
sleep(1)
|
||||
success = client.lock(TEST_KEY, 50) do
|
||||
sleep(0.5)
|
||||
success_counter << i
|
||||
end
|
||||
|
||||
@@ -109,12 +135,12 @@ end
|
||||
|
||||
class TestBaseClient < Minitest::Test
|
||||
def setup
|
||||
@klass = Suo::Client::Base
|
||||
@client = Suo::Client::Base.new(client: {})
|
||||
end
|
||||
|
||||
def test_not_implemented
|
||||
assert_raises(NotImplementedError) do
|
||||
@klass.lock(TEST_KEY, 1)
|
||||
@client.send(:get, TEST_KEY)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -123,13 +149,12 @@ class TestMemcachedClient < Minitest::Test
|
||||
include ClientTests
|
||||
|
||||
def setup
|
||||
@klass = Suo::Client::Memcached
|
||||
@client = @klass.new
|
||||
@klass_client = Dalli::Client.new("127.0.0.1:11211")
|
||||
@dalli = Dalli::Client.new("127.0.0.1:11211")
|
||||
@client = Suo::Client::Memcached.new
|
||||
end
|
||||
|
||||
def teardown
|
||||
@klass_client.delete(TEST_KEY)
|
||||
@dalli.delete(TEST_KEY)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -137,13 +162,12 @@ class TestRedisClient < Minitest::Test
|
||||
include ClientTests
|
||||
|
||||
def setup
|
||||
@klass = Suo::Client::Redis
|
||||
@client = @klass.new
|
||||
@klass_client = Redis.new
|
||||
@redis = Redis.new
|
||||
@client = Suo::Client::Redis.new
|
||||
end
|
||||
|
||||
def teardown
|
||||
@klass_client.del(TEST_KEY)
|
||||
@redis.del(TEST_KEY)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -6,4 +6,3 @@ require "minitest/autorun"
|
||||
require "minitest/benchmark"
|
||||
|
||||
ENV["SUO_TEST"] = "true"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user