mirror of
https://github.com/dkam/suo.git
synced 2025-01-29 07:42:43 +00:00
refactor shared logic into base class
This commit is contained in:
@@ -33,32 +33,86 @@ module Suo
|
|||||||
end
|
end
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
def lock(key, resources = 1, options = {})
|
||||||
fail NotImplementedError
|
options = merge_defaults(options)
|
||||||
|
acquisition_token = nil
|
||||||
|
token = SecureRandom.base64(16)
|
||||||
|
|
||||||
|
retry_with_timeout(key, options) do
|
||||||
|
val, cas = get(key, options)
|
||||||
|
|
||||||
|
if val.nil?
|
||||||
|
set_initial(key, options)
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
|
locks = deserialize_and_clear_locks(val, options)
|
||||||
|
|
||||||
|
if locks.size < resources
|
||||||
|
add_lock(locks, token)
|
||||||
|
|
||||||
|
newval = serialize_locks(locks)
|
||||||
|
|
||||||
|
if set(key, newval, cas, options)
|
||||||
|
acquisition_token = token
|
||||||
|
break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
acquisition_token
|
||||||
end
|
end
|
||||||
|
|
||||||
def locked?(key, resources = 1, options = {})
|
def locked?(key, resources = 1, options = {})
|
||||||
options = merge_defaults(options)
|
locks(key, options).size >= resources
|
||||||
client = options[:client]
|
|
||||||
locks = deserialize_locks(client.get(key))
|
|
||||||
|
|
||||||
locks.size >= resources
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def locks(key, options)
|
def locks(key, options)
|
||||||
options = merge_defaults(options)
|
options = merge_defaults(options)
|
||||||
client = options[:client]
|
val, _ = get(key, options)
|
||||||
locks = deserialize_locks(client.get(key))
|
locks = deserialize_locks(val)
|
||||||
|
|
||||||
locks.size
|
locks
|
||||||
end
|
end
|
||||||
|
|
||||||
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
def refresh(key, acquisition_token, options = {})
|
||||||
fail NotImplementedError
|
options = merge_defaults(options)
|
||||||
|
|
||||||
|
retry_with_timeout(key, options) do
|
||||||
|
val, cas = get(key, options)
|
||||||
|
|
||||||
|
if val.nil?
|
||||||
|
set_initial(key, options)
|
||||||
|
next
|
||||||
end
|
end
|
||||||
|
|
||||||
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
locks = deserialize_and_clear_locks(val, options)
|
||||||
fail NotImplementedError
|
|
||||||
|
refresh_lock(locks, acquisition_token)
|
||||||
|
|
||||||
|
break if set(key, serialize_locks(locks), cas, options)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def unlock(key, acquisition_token, options = {})
|
||||||
|
options = merge_defaults(options)
|
||||||
|
|
||||||
|
return unless acquisition_token
|
||||||
|
|
||||||
|
retry_with_timeout(key, options) do
|
||||||
|
val, cas = get(key, options)
|
||||||
|
|
||||||
|
break if val.nil?
|
||||||
|
|
||||||
|
locks = deserialize_and_clear_locks(val, options)
|
||||||
|
|
||||||
|
acquisition_lock = remove_lock(locks, acquisition_token)
|
||||||
|
|
||||||
|
break unless acquisition_lock
|
||||||
|
break if set(key, serialize_locks(locks), cas, options)
|
||||||
|
end
|
||||||
|
rescue FailedToAcquireLock => _ # rubocop:disable Lint/HandleExceptions
|
||||||
|
# ignore - assume success due to optimistic locking
|
||||||
end
|
end
|
||||||
|
|
||||||
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
|
||||||
@@ -81,10 +135,49 @@ module Suo
|
|||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
def get(key, options) # rubocop:disable Lint/UnusedMethodArgument
|
||||||
|
fail NotImplementedError
|
||||||
|
end
|
||||||
|
|
||||||
|
def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument
|
||||||
|
fail NotImplementedError
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument
|
||||||
|
fail NotImplementedError
|
||||||
|
end
|
||||||
|
|
||||||
|
def synchronize(key, options)
|
||||||
|
yield(key, options)
|
||||||
|
end
|
||||||
|
|
||||||
|
def retry_with_timeout(key, options)
|
||||||
|
start = Time.now.to_f
|
||||||
|
|
||||||
|
options[:retry_count].times do
|
||||||
|
if options[:retry_timeout]
|
||||||
|
now = Time.now.to_f
|
||||||
|
break if now - start > options[:retry_timeout]
|
||||||
|
end
|
||||||
|
|
||||||
|
synchronize(key, options) do
|
||||||
|
yield
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
||||||
|
end
|
||||||
|
rescue => _
|
||||||
|
raise FailedToAcquireLock
|
||||||
|
end
|
||||||
|
|
||||||
def serialize_locks(locks)
|
def serialize_locks(locks)
|
||||||
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
|
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def deserialize_and_clear_locks(val, options)
|
||||||
|
clear_expired_locks(deserialize_locks(val), options)
|
||||||
|
end
|
||||||
|
|
||||||
def deserialize_locks(val)
|
def deserialize_locks(val)
|
||||||
MessagePack.unpack(val).map do |time, token|
|
MessagePack.unpack(val).map do |time, token|
|
||||||
[Time.at(time), token]
|
[Time.at(time), token]
|
||||||
|
|||||||
@@ -7,130 +7,24 @@ module Suo
|
|||||||
end
|
end
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
def lock(key, resources = 1, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
acquisition_token = nil
|
|
||||||
token = SecureRandom.base64(16)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
val, cas = client.get_cas(key)
|
|
||||||
|
|
||||||
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
|
|
||||||
if val.nil?
|
|
||||||
client.set(key, "")
|
|
||||||
next
|
|
||||||
end
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
|
|
||||||
|
|
||||||
if locks.size < resources
|
|
||||||
add_lock(locks, token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
if client.set_cas(key, newval, cas)
|
|
||||||
acquisition_token = token
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise FailedToAcquireLock
|
|
||||||
end
|
|
||||||
|
|
||||||
acquisition_token
|
|
||||||
end
|
|
||||||
|
|
||||||
def refresh(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
val, cas = client.get_cas(key)
|
|
||||||
|
|
||||||
# much like with initial set - ensure the key is here
|
|
||||||
if val.nil?
|
|
||||||
client.set(key, "")
|
|
||||||
next
|
|
||||||
end
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
refresh_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
break if client.set_cas(key, newval, cas)
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise FailedToAcquireLock
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def unlock(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
return unless acquisition_token
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
val, cas = client.get_cas(key)
|
|
||||||
|
|
||||||
break if val.nil? # lock has expired totally
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
break unless acquisition_lock
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
break if client.set_cas(key, newval, cas)
|
|
||||||
|
|
||||||
# another client cleared a token in the interim - try again!
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => boom # rubocop:disable Lint/HandleExceptions
|
|
||||||
# since it's optimistic locking - fine if we are unable to release
|
|
||||||
raise boom if ENV["SUO_TEST"]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def clear(key, options = {})
|
def clear(key, options = {})
|
||||||
options = merge_defaults(options)
|
options = merge_defaults(options)
|
||||||
options[:client].delete(key)
|
options[:client].delete(key)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def get(key, options)
|
||||||
|
options[:client].get_cas(key)
|
||||||
|
end
|
||||||
|
|
||||||
|
def set(key, newval, cas, options)
|
||||||
|
options[:client].set_cas(key, newval, cas)
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_initial(key, options)
|
||||||
|
options[:client].set(key, "")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -7,159 +7,36 @@ module Suo
|
|||||||
end
|
end
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
def lock(key, resources = 1, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
acquisition_token = nil
|
|
||||||
token = SecureRandom.base64(16)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
client.watch(key) do
|
|
||||||
begin
|
|
||||||
val = client.get(key)
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
|
|
||||||
|
|
||||||
if locks.size < resources
|
|
||||||
add_lock(locks, token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
ret = client.multi do |multi|
|
|
||||||
multi.set(key, newval)
|
|
||||||
end
|
|
||||||
|
|
||||||
acquisition_token = token if ret[0] == "OK"
|
|
||||||
end
|
|
||||||
ensure
|
|
||||||
client.unwatch
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
break if acquisition_token
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise Suo::Client::FailedToAcquireLock
|
|
||||||
end
|
|
||||||
|
|
||||||
acquisition_token
|
|
||||||
end
|
|
||||||
|
|
||||||
def refresh(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
refreshed = false
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
client.watch(key) do
|
|
||||||
begin
|
|
||||||
val = client.get(key)
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
refresh_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
ret = client.multi do |multi|
|
|
||||||
multi.set(key, newval)
|
|
||||||
end
|
|
||||||
|
|
||||||
refreshed = ret[0] == "OK"
|
|
||||||
ensure
|
|
||||||
client.unwatch
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
break if refreshed
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => _
|
|
||||||
raise Suo::Client::FailedToAcquireLock
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def unlock(key, acquisition_token, options = {})
|
|
||||||
options = merge_defaults(options)
|
|
||||||
client = options[:client]
|
|
||||||
|
|
||||||
return unless acquisition_token
|
|
||||||
|
|
||||||
begin
|
|
||||||
start = Time.now.to_f
|
|
||||||
|
|
||||||
options[:retry_count].times do
|
|
||||||
cleared = false
|
|
||||||
|
|
||||||
if options[:retry_timeout]
|
|
||||||
now = Time.now.to_f
|
|
||||||
break if now - start > options[:retry_timeout]
|
|
||||||
end
|
|
||||||
|
|
||||||
client.watch(key) do
|
|
||||||
begin
|
|
||||||
val = client.get(key)
|
|
||||||
|
|
||||||
if val.nil?
|
|
||||||
cleared = true
|
|
||||||
break
|
|
||||||
end
|
|
||||||
|
|
||||||
locks = clear_expired_locks(deserialize_locks(val), options)
|
|
||||||
|
|
||||||
acquisition_lock = remove_lock(locks, acquisition_token)
|
|
||||||
|
|
||||||
unless acquisition_lock
|
|
||||||
# token was already cleared
|
|
||||||
cleared = true
|
|
||||||
break
|
|
||||||
end
|
|
||||||
|
|
||||||
newval = serialize_locks(locks)
|
|
||||||
|
|
||||||
ret = client.multi do |multi|
|
|
||||||
multi.set(key, newval)
|
|
||||||
end
|
|
||||||
|
|
||||||
cleared = ret[0] == "OK"
|
|
||||||
ensure
|
|
||||||
client.unwatch
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
break if cleared
|
|
||||||
|
|
||||||
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
|
|
||||||
end
|
|
||||||
rescue => boom # rubocop:disable Lint/HandleExceptions
|
|
||||||
# since it's optimistic locking - fine if we are unable to release
|
|
||||||
raise boom if ENV["SUO_TEST"]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def clear(key, options = {})
|
def clear(key, options = {})
|
||||||
options = merge_defaults(options)
|
options = merge_defaults(options)
|
||||||
options[:client].del(key)
|
options[:client].del(key)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def get(key, options)
|
||||||
|
[options[:client].get(key), nil]
|
||||||
|
end
|
||||||
|
|
||||||
|
def set(key, newval, _, options)
|
||||||
|
ret = options[:client].multi do |multi|
|
||||||
|
multi.set(key, newval)
|
||||||
|
end
|
||||||
|
|
||||||
|
ret[0] == "OK"
|
||||||
|
end
|
||||||
|
|
||||||
|
def synchronize(key, options)
|
||||||
|
options[:client].watch(key) do
|
||||||
|
yield
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
options[:client].unwatch
|
||||||
|
end
|
||||||
|
|
||||||
|
def set_initial(key, options)
|
||||||
|
options[:client].set(key, "")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ module ClientTests
|
|||||||
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
|
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
|
||||||
|
|
||||||
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
|
||||||
|
|
||||||
assert_equal false, locked
|
assert_equal false, locked
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -75,7 +76,7 @@ module ClientTests
|
|||||||
100.times.map do |i|
|
100.times.map do |i|
|
||||||
Thread.new do
|
Thread.new do
|
||||||
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do
|
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do
|
||||||
sleep(1)
|
sleep(1.1)
|
||||||
success_counter << i
|
success_counter << i
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -114,7 +115,7 @@ class TestBaseClient < Minitest::Test
|
|||||||
|
|
||||||
def test_not_implemented
|
def test_not_implemented
|
||||||
assert_raises(NotImplementedError) do
|
assert_raises(NotImplementedError) do
|
||||||
@klass.lock(TEST_KEY, 1)
|
@klass.send(:get, TEST_KEY, {})
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user