mirror of
https://github.com/dkam/suo.git
synced 2025-01-29 07:42:43 +00:00
Fix #initial_set which is causing a double attempt and delay on lock acquisition and incorrect drop on short acquisition_timeout
190 lines
4.4 KiB
Ruby
190 lines
4.4 KiB
Ruby
module Suo
|
|
module Client
|
|
class Base
|
|
DEFAULT_OPTIONS = {
|
|
acquisition_timeout: 0.1,
|
|
acquisition_delay: 0.01,
|
|
stale_lock_expiration: 3600,
|
|
resources: 1,
|
|
ttl: 60,
|
|
}.freeze
|
|
|
|
BLANK_STR = "".freeze
|
|
|
|
attr_accessor :client, :key, :resources, :options
|
|
|
|
include MonitorMixin
|
|
|
|
def initialize(key, options = {})
|
|
fail "Client required" unless options[:client]
|
|
|
|
@options = DEFAULT_OPTIONS.merge(options)
|
|
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
|
|
@client = @options[:client]
|
|
@resources = @options[:resources].to_i
|
|
@key = key
|
|
|
|
super() # initialize Monitor mixin for thread safety
|
|
end
|
|
|
|
def lock(custom_token = nil)
|
|
token = acquire_lock(custom_token)
|
|
|
|
if block_given? && token
|
|
begin
|
|
yield
|
|
ensure
|
|
unlock(token)
|
|
end
|
|
else
|
|
token
|
|
end
|
|
end
|
|
|
|
def locked?
|
|
locks.size >= resources
|
|
end
|
|
|
|
def locks
|
|
val, _ = get
|
|
cleared_locks = deserialize_and_clear_locks(val)
|
|
|
|
cleared_locks
|
|
end
|
|
|
|
def refresh(token)
|
|
retry_with_timeout do
|
|
val, cas = get
|
|
|
|
cas = initial_set if val.nil?
|
|
|
|
cleared_locks = deserialize_and_clear_locks(val)
|
|
|
|
refresh_lock(cleared_locks, token)
|
|
|
|
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
|
|
end
|
|
end
|
|
|
|
def unlock(token)
|
|
return unless token
|
|
|
|
retry_with_timeout do
|
|
val, cas = get
|
|
|
|
break if val.nil?
|
|
|
|
cleared_locks = deserialize_and_clear_locks(val)
|
|
|
|
acquisition_lock = remove_lock(cleared_locks, token)
|
|
|
|
break unless acquisition_lock
|
|
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
|
|
end
|
|
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
|
|
# ignore - assume success due to optimistic locking
|
|
end
|
|
|
|
def clear
|
|
fail NotImplementedError
|
|
end
|
|
|
|
private
|
|
|
|
attr_accessor :retry_count
|
|
|
|
def acquire_lock(token = nil)
|
|
token ||= SecureRandom.base64(16)
|
|
|
|
retry_with_timeout do
|
|
val, cas = get
|
|
|
|
cas = initial_set if val.nil?
|
|
|
|
cleared_locks = deserialize_and_clear_locks(val)
|
|
|
|
if cleared_locks.size < resources
|
|
add_lock(cleared_locks, token)
|
|
|
|
newval = serialize_locks(cleared_locks)
|
|
|
|
return token if set(newval, cas)
|
|
end
|
|
end
|
|
|
|
nil
|
|
end
|
|
|
|
def get
|
|
fail NotImplementedError
|
|
end
|
|
|
|
def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
|
|
fail NotImplementedError
|
|
end
|
|
|
|
def initial_set(val = BLANK_STR) # rubocop:disable Lint/UnusedMethodArgument
|
|
fail NotImplementedError
|
|
end
|
|
|
|
def synchronize
|
|
mon_synchronize { yield }
|
|
end
|
|
|
|
def retry_with_timeout
|
|
start = Time.now.to_f
|
|
|
|
retry_count.times do
|
|
elapsed = Time.now.to_f - start
|
|
break if elapsed >= options[:acquisition_timeout]
|
|
|
|
synchronize do
|
|
yield
|
|
end
|
|
|
|
sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
|
|
end
|
|
rescue => _
|
|
raise LockClientError
|
|
end
|
|
|
|
def serialize_locks(locks)
|
|
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
|
|
end
|
|
|
|
def deserialize_and_clear_locks(val)
|
|
clear_expired_locks(deserialize_locks(val))
|
|
end
|
|
|
|
def deserialize_locks(val)
|
|
unpacked = (val.nil? || val == BLANK_STR) ? [] : MessagePack.unpack(val)
|
|
|
|
unpacked.map do |time, token|
|
|
[Time.at(time), token]
|
|
end
|
|
rescue EOFError, MessagePack::MalformedFormatError => _
|
|
[]
|
|
end
|
|
|
|
def clear_expired_locks(locks)
|
|
expired = Time.now - options[:stale_lock_expiration]
|
|
locks.reject { |time, _| time < expired }
|
|
end
|
|
|
|
def add_lock(locks, token, time = Time.now.to_f)
|
|
locks << [time, token]
|
|
end
|
|
|
|
def remove_lock(locks, acquisition_token)
|
|
lock = locks.find { |_, token| token == acquisition_token }
|
|
locks.delete(lock)
|
|
end
|
|
|
|
def refresh_lock(locks, acquisition_token)
|
|
remove_lock(locks, acquisition_token)
|
|
add_lock(locks, acquisition_token)
|
|
end
|
|
end
|
|
end
|
|
end
|