From c2b9de4cf3593be105c80f06acea021db8f41026 Mon Sep 17 00:00:00 2001 From: Nick Elser Date: Sun, 12 Apr 2015 17:54:30 -0700 Subject: [PATCH] refactor shared logic into base class --- lib/suo/client/base.rb | 121 ++++++++++++++++++++++--- lib/suo/client/memcached.rb | 134 +++------------------------ lib/suo/client/redis.rb | 175 ++++++------------------------------ test/client_test.rb | 5 +- 4 files changed, 150 insertions(+), 285 deletions(-) diff --git a/lib/suo/client/base.rb b/lib/suo/client/base.rb index c5cef55..caec55c 100644 --- a/lib/suo/client/base.rb +++ b/lib/suo/client/base.rb @@ -33,32 +33,86 @@ module Suo end class << self - def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError + def lock(key, resources = 1, options = {}) + options = merge_defaults(options) + acquisition_token = nil + token = SecureRandom.base64(16) + + retry_with_timeout(key, options) do + val, cas = get(key, options) + + if val.nil? + set_initial(key, options) + next + end + + locks = deserialize_and_clear_locks(val, options) + + if locks.size < resources + add_lock(locks, token) + + newval = serialize_locks(locks) + + if set(key, newval, cas, options) + acquisition_token = token + break + end + end + end + + acquisition_token end def locked?(key, resources = 1, options = {}) - options = merge_defaults(options) - client = options[:client] - locks = deserialize_locks(client.get(key)) - - locks.size >= resources + locks(key, options).size >= resources end def locks(key, options) options = merge_defaults(options) - client = options[:client] - locks = deserialize_locks(client.get(key)) + val, _ = get(key, options) + locks = deserialize_locks(val) - locks.size + locks end - def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError + def refresh(key, acquisition_token, options = {}) + options = merge_defaults(options) + + retry_with_timeout(key, options) do + val, cas = get(key, options) + + if val.nil? + set_initial(key, options) + next + end + + locks = deserialize_and_clear_locks(val, options) + + refresh_lock(locks, acquisition_token) + + break if set(key, serialize_locks(locks), cas, options) + end end - def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument - fail NotImplementedError + def unlock(key, acquisition_token, options = {}) + options = merge_defaults(options) + + return unless acquisition_token + + retry_with_timeout(key, options) do + val, cas = get(key, options) + + break if val.nil? + + locks = deserialize_and_clear_locks(val, options) + + acquisition_lock = remove_lock(locks, acquisition_token) + + break unless acquisition_lock + break if set(key, serialize_locks(locks), cas, options) + end + rescue FailedToAcquireLock => _ # rubocop:disable Lint/HandleExceptions + # ignore - assume success due to optimistic locking end def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument @@ -81,10 +135,49 @@ module Suo private + def get(key, options) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument + fail NotImplementedError + end + + def synchronize(key, options) + yield(key, options) + end + + def retry_with_timeout(key, options) + start = Time.now.to_f + + options[:retry_count].times do + if options[:retry_timeout] + now = Time.now.to_f + break if now - start > options[:retry_timeout] + end + + synchronize(key, options) do + yield + end + + sleep(rand(options[:retry_delay] * 1000).to_f / 1000) + end + rescue => _ + raise FailedToAcquireLock + end + def serialize_locks(locks) MessagePack.pack(locks.map { |time, token| [time.to_f, token] }) end + def deserialize_and_clear_locks(val, options) + clear_expired_locks(deserialize_locks(val), options) + end + def deserialize_locks(val) MessagePack.unpack(val).map do |time, token| [Time.at(time), token] diff --git a/lib/suo/client/memcached.rb b/lib/suo/client/memcached.rb index 3c69afd..01767ed 100644 --- a/lib/suo/client/memcached.rb +++ b/lib/suo/client/memcached.rb @@ -7,130 +7,24 @@ module Suo end class << self - def lock(key, resources = 1, options = {}) - options = merge_defaults(options) - acquisition_token = nil - token = SecureRandom.base64(16) - client = options[:client] - - begin - start = Time.now.to_f - - options[:retry_count].times do - if options[:retry_timeout] - now = Time.now.to_f - break if now - start > options[:retry_timeout] - end - - val, cas = client.get_cas(key) - - # no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting - if val.nil? - client.set(key, "") - next - end - - locks = clear_expired_locks(deserialize_locks(val.to_s), options) - - if locks.size < resources - add_lock(locks, token) - - newval = serialize_locks(locks) - - if client.set_cas(key, newval, cas) - acquisition_token = token - break - end - end - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => _ - raise FailedToAcquireLock - end - - acquisition_token - end - - def refresh(key, acquisition_token, options = {}) - options = merge_defaults(options) - client = options[:client] - - begin - start = Time.now.to_f - - options[:retry_count].times do - if options[:retry_timeout] - now = Time.now.to_f - break if now - start > options[:retry_timeout] - end - - val, cas = client.get_cas(key) - - # much like with initial set - ensure the key is here - if val.nil? - client.set(key, "") - next - end - - locks = clear_expired_locks(deserialize_locks(val), options) - - refresh_lock(locks, acquisition_token) - - newval = serialize_locks(locks) - - break if client.set_cas(key, newval, cas) - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => _ - raise FailedToAcquireLock - end - end - - def unlock(key, acquisition_token, options = {}) - options = merge_defaults(options) - client = options[:client] - - return unless acquisition_token - - begin - start = Time.now.to_f - - options[:retry_count].times do - if options[:retry_timeout] - now = Time.now.to_f - break if now - start > options[:retry_timeout] - end - - val, cas = client.get_cas(key) - - break if val.nil? # lock has expired totally - - locks = clear_expired_locks(deserialize_locks(val), options) - - acquisition_lock = remove_lock(locks, acquisition_token) - - break unless acquisition_lock - - newval = serialize_locks(locks) - - break if client.set_cas(key, newval, cas) - - # another client cleared a token in the interim - try again! - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => boom # rubocop:disable Lint/HandleExceptions - # since it's optimistic locking - fine if we are unable to release - raise boom if ENV["SUO_TEST"] - end - end - def clear(key, options = {}) options = merge_defaults(options) options[:client].delete(key) end + + private + + def get(key, options) + options[:client].get_cas(key) + end + + def set(key, newval, cas, options) + options[:client].set_cas(key, newval, cas) + end + + def set_initial(key, options) + options[:client].set(key, "") + end end end end diff --git a/lib/suo/client/redis.rb b/lib/suo/client/redis.rb index 9d6275b..0b57d06 100644 --- a/lib/suo/client/redis.rb +++ b/lib/suo/client/redis.rb @@ -7,159 +7,36 @@ module Suo end class << self - def lock(key, resources = 1, options = {}) - options = merge_defaults(options) - acquisition_token = nil - token = SecureRandom.base64(16) - client = options[:client] - - begin - start = Time.now.to_f - - options[:retry_count].times do - if options[:retry_timeout] - now = Time.now.to_f - break if now - start > options[:retry_timeout] - end - - client.watch(key) do - begin - val = client.get(key) - - locks = clear_expired_locks(deserialize_locks(val.to_s), options) - - if locks.size < resources - add_lock(locks, token) - - newval = serialize_locks(locks) - - ret = client.multi do |multi| - multi.set(key, newval) - end - - acquisition_token = token if ret[0] == "OK" - end - ensure - client.unwatch - end - end - - break if acquisition_token - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => _ - raise Suo::Client::FailedToAcquireLock - end - - acquisition_token - end - - def refresh(key, acquisition_token, options = {}) - options = merge_defaults(options) - client = options[:client] - refreshed = false - - begin - start = Time.now.to_f - - options[:retry_count].times do - if options[:retry_timeout] - now = Time.now.to_f - break if now - start > options[:retry_timeout] - end - - client.watch(key) do - begin - val = client.get(key) - - locks = clear_expired_locks(deserialize_locks(val), options) - - refresh_lock(locks, acquisition_token) - - newval = serialize_locks(locks) - - ret = client.multi do |multi| - multi.set(key, newval) - end - - refreshed = ret[0] == "OK" - ensure - client.unwatch - end - end - - break if refreshed - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => _ - raise Suo::Client::FailedToAcquireLock - end - end - - def unlock(key, acquisition_token, options = {}) - options = merge_defaults(options) - client = options[:client] - - return unless acquisition_token - - begin - start = Time.now.to_f - - options[:retry_count].times do - cleared = false - - if options[:retry_timeout] - now = Time.now.to_f - break if now - start > options[:retry_timeout] - end - - client.watch(key) do - begin - val = client.get(key) - - if val.nil? - cleared = true - break - end - - locks = clear_expired_locks(deserialize_locks(val), options) - - acquisition_lock = remove_lock(locks, acquisition_token) - - unless acquisition_lock - # token was already cleared - cleared = true - break - end - - newval = serialize_locks(locks) - - ret = client.multi do |multi| - multi.set(key, newval) - end - - cleared = ret[0] == "OK" - ensure - client.unwatch - end - end - - break if cleared - - sleep(rand(options[:retry_delay] * 1000).to_f / 1000) - end - rescue => boom # rubocop:disable Lint/HandleExceptions - # since it's optimistic locking - fine if we are unable to release - raise boom if ENV["SUO_TEST"] - end - end - def clear(key, options = {}) options = merge_defaults(options) options[:client].del(key) end + + private + + def get(key, options) + [options[:client].get(key), nil] + end + + def set(key, newval, _, options) + ret = options[:client].multi do |multi| + multi.set(key, newval) + end + + ret[0] == "OK" + end + + def synchronize(key, options) + options[:client].watch(key) do + yield + end + ensure + options[:client].unwatch + end + + def set_initial(key, options) + options[:client].set(key, "") + end end end end diff --git a/test/client_test.rb b/test/client_test.rb index 8ad233b..059e1bc 100644 --- a/test/client_test.rb +++ b/test/client_test.rb @@ -24,6 +24,7 @@ module ClientTests @klass.unlock(TEST_KEY, lock1, client: @klass_client) locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) + assert_equal false, locked end @@ -75,7 +76,7 @@ module ClientTests 100.times.map do |i| Thread.new do success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do - sleep(1) + sleep(1.1) success_counter << i end @@ -114,7 +115,7 @@ class TestBaseClient < Minitest::Test def test_not_implemented assert_raises(NotImplementedError) do - @klass.lock(TEST_KEY, 1) + @klass.send(:get, TEST_KEY, {}) end end end