85 Commits

Author SHA1 Message Date
Dan Milne
59d3385d8a Remove code which generates a deprecation warning 2023-01-26 13:04:49 +11:00
Dan Milne
3d5c102c08 Merge upstream/master 2023-01-26 13:03:05 +11:00
Nick Elser
7bb28cc007 update some badges 2021-01-21 10:38:29 -08:00
Nick Elser
65cae9aa58 update changelog & version 2021-01-21 10:32:06 -08:00
Nick Elser
6b6eb4e590 Merge pull request #15 from doits/monotonic_time
use monotonic time for retry timeout check
2021-01-21 10:27:46 -08:00
Nick Elser
485f2bff37 Merge pull request #12 from mlarraz/redis_pool
Support connection pooled Redis
2021-01-21 10:25:56 -08:00
Matt Larraz
3a37a74982 Support connection pooled Redis 2021-01-20 16:47:30 -08:00
Nick Elser
b02c256c25 Merge pull request #16 from mlarraz/ruby_versions
Switch to Github Actions and update supported Ruby versions
2021-01-20 16:45:40 -08:00
Matt Larraz
d57f6a15ac Switch to Github Actions and update supported Ruby versions 2021-01-20 16:08:37 -08:00
Dan Milne
d293ef6fcf Allow Ruby 3 2021-01-11 11:29:17 +11:00
Markus Doits
6e74322ff1 use monotonic time for retry timeout check 2021-01-07 12:52:21 +01:00
Nick Elser
b9d3f1b7a1 Bump version and update changelog. 2019-09-04 14:36:40 -07:00
Nick Elser
270c05b80e Merge pull request #10 from levkk/levkk/support-pooled-clients
Add support for pooled memcached clients by using #with
2019-09-04 14:33:26 -07:00
Lev Kokotov
60e167e146 relax bundler dependency version 2019-08-22 16:16:57 -07:00
Lev Kokotov
ad08c8b5ea bump Bundler to 2 2019-08-22 16:14:01 -07:00
Lev Kokotov
9b8ef6c244 Add support for pooled memcached clients by using #with 2019-08-22 16:04:14 -07:00
Nick Elser
b8a1d7d9ac Merge pull request #8 from nickelser/update_changelog
Update changelog
2018-10-05 14:06:42 -07:00
Nick Elser
c58a247156 Run on more modern rubies, as well. 2018-10-05 16:40:25 -04:00
Nick Elser
8c37c24ee6 Merge branch 'master' into update_changelog 2018-10-05 16:31:49 -04:00
Nick Elser
29da8cf090 Add changelog entry, remove spurious gemspec entry. 2018-10-05 16:31:33 -04:00
Nick Elser
8ed488f071 Merge pull request #5 from keylimetoolbox/double-initial-set
Fix #initial_set which is causing a double attempt and delay on lock acquisition and incorrect drop on short acquisition_timeout
2018-10-05 13:28:16 -07:00
Nick Elser
152b6acf9c Merge pull request #7 from nickelser/update_rubocop
Update Rubocop, and bump the version.
2018-10-05 13:25:44 -07:00
Nick Elser
5e10afe534 Update Rubocop, and bump the version. 2018-10-05 16:14:49 -04:00
Nick Elser
0423eb9e12 Merge pull request #6 from GandalftheGUI/ian_remillard/remove_keys_after_last_lock_released
Add 'Time To Live' to mitigate potential memory leak
2018-10-05 13:06:29 -07:00
Ian Remillard
ca46f5f369 add default for expire on set 2018-10-01 10:48:50 -07:00
Ian Remillard
1022a6f9d3 move to expire only when all locks are released 2018-10-01 10:35:30 -07:00
Ian Remillard
6be3a5bdda edits to docs 2018-09-28 16:43:44 -07:00
Ian Remillard
aa4da5d739 update docs for new options 2018-09-28 13:19:01 -07:00
Ian Remillard
fdb0b7f9d5 adds lock ttl and lock_release_removes_key 2018-09-28 12:43:52 -07:00
Jeremy Wadsack
a13edcf7d1 Fix #initial_set which is causing a double attempt and delay on lock acquisition
The call to `#initial_set` in `#retry` and `#acquire_lock` is followed by `next` which leads to a second pass through the `#retry_with_timeout` loop and a sleep call for up to `:acquisition_delay`. This delay isn't necessary if the value can be set without a race condition.

Removing the `next` call causes the client to continue to retry because the transaction has been changed outside the transaction boundary:

In Redis, calling `SET` within a `WATCH`/`UNWATCH` block but not inside a `MULTI`/`EXEC` block will [cause the EXEC to fail the transaction](https://github.com/antirez/redis-doc/issues/734), so the first `#set` call fails and it requires a second pass. To resolve this I changed `#initial_set` to call `#set` within a `MULTI` block so that it would be inside the transaction.

In Memcache the call to `SET` without the `CAS` during `#initial_set` is going to cause the `SET` with `CAS` to fail (return `EXISTS`), and resulting in a second pass. To resolve this I changed `#initial_set` to use `SET` with `CAS` and return the CAS value to be used in the subsequent `#set` call that stores the lock token.
2018-08-30 13:06:16 -07:00
Nick Elser
af1c476f08 Bump version. 2016-10-06 10:22:36 -07:00
Nick Elser
58fae54022 Minor style fixes. 2016-10-06 10:22:29 -07:00
Nick Elser
2088fd90b3 Merge pull request #1 from Shuttlerock/master
Allow to use custom token for lock
2016-10-06 10:20:35 -07:00
Vokhmin Alexey V
05661e143c Allow to use custom token for lock 2016-10-05 13:47:10 +03:00
Nick Elser
a23282dcc6 don't go around allocating empty strings willy-nilly 2015-05-07 00:16:28 -07:00
Nick Elser
323caaee9b update readme 2015-04-15 23:14:06 -07:00
Nick Elser
745d49466f release v0.3.0 2015-04-15 23:11:06 -07:00
Nick Elser
161d50deb9 update tests for new interface 2015-04-15 23:10:34 -07:00
Nick Elser
81e4a3e143 dramatically simpify interface by forcing key at initialization 2015-04-15 23:10:21 -07:00
Nick Elser
2960c14a4d use same language for summary + description 2015-04-13 22:28:42 -07:00
Nick Elser
308e918e60 on second thought, remove confusing language 2015-04-13 22:20:44 -07:00
Nick Elser
aaee69a2df release v0.2.3 2015-04-13 22:15:14 -07:00
Nick Elser
c6d1c29ada add additional deadlock tests 2015-04-13 22:13:15 -07:00
Nick Elser
14e442e99d remove semaphore language and clarify language 2015-04-13 22:13:07 -07:00
Nick Elser
498073b92e tiny code style improvements 2015-04-13 21:55:34 -07:00
Nick Elser
155a3ac40c release v0.2.2 2015-04-13 21:45:47 -07:00
Nick Elser
10d3ab09cf handle invalid lock data 2015-04-13 21:40:54 -07:00
Nick Elser
2724ec6d9d add another test case for the nil refresh case 2015-04-13 21:32:01 -07:00
Nick Elser
185327f59c fix documentation and add another test for refresh 2015-04-13 21:21:45 -07:00
Nick Elser
c8a972da31 more tests for (still not great) refresh method 2015-04-13 20:29:23 -07:00
Nick Elser
7591c08a28 avoid name collision for locks method 2015-04-13 20:29:03 -07:00
Nick Elser
1dee338e16 slightly more coverage 2015-04-13 19:42:50 -07:00
Nick Elser
900c723043 only report to codeclimate when credentials passed in 2015-04-13 19:37:58 -07:00
Nick Elser
6e2afdf80a add tests for refresh and slight refactor 2015-04-13 19:37:49 -07:00
Nick Elser
49a9757d44 fix refresh token 2015-04-13 19:37:34 -07:00
Nick Elser
754d7d8faf report test coverage 2015-04-13 10:12:26 -07:00
Nick Elser
857fc63378 release 0.2.1 2015-04-12 23:45:09 -07:00
Nick Elser
bb6762bbc6 ret is not always an array 2015-04-12 23:45:00 -07:00
Nick Elser
f0977c89f2 remove redundant require file 2015-04-12 23:03:54 -07:00
Nick Elser
4d5c96309f some style fixes 2015-04-12 22:54:53 -07:00
Nick Elser
8d6061b137 rename some tests, fix rare edge condition 2015-04-12 22:49:02 -07:00
Nick Elser
ce0a4d8d86 release 0.2.0 2015-04-12 22:33:19 -07:00
Nick Elser
1fd769eec2 refactor class methods into instance methods 2015-04-12 22:32:51 -07:00
Nick Elser
37be5ae27b bump version, update changelog 2015-04-12 20:58:18 -07:00
Nick Elser
a1a226fb59 account for variabilities in test memcached 2015-04-12 20:57:52 -07:00
Nick Elser
8d7ddaf35a move logic around loop counts to a more reasonable location 2015-04-12 20:57:29 -07:00
Nick Elser
1aacc0c1a1 properly throw lock LockClientError 2015-04-12 20:47:35 -07:00
Nick Elser
8166c6b51d specify ruby version 2015-04-12 19:53:47 -07:00
Nick Elser
7662743123 update changelog 2015-04-12 19:47:35 -07:00
Nick Elser
a2fa281b86 fix test cases when memcached is delayed 2015-04-12 19:47:23 -07:00
Nick Elser
887219b63d release 0.1.2 2015-04-12 19:43:59 -07:00
Nick Elser
c0905fef91 faster unpack without exceptions 2015-04-12 19:43:07 -07:00
Nick Elser
a54b795e20 simplify defaults logic and fix retry timeouts 2015-04-12 19:42:59 -07:00
Nick Elser
0828cd546b add gem info 2015-04-12 17:54:38 -07:00
Nick Elser
c2b9de4cf3 refactor shared logic into base class 2015-04-12 17:54:30 -07:00
Nick Elser
d4860423fa whitespace 2015-04-12 16:03:53 -07:00
Nick Elser
d8f8350d1c check timeout at the entry of the loop 2015-04-12 15:57:32 -07:00
Nick Elser
1668756a48 update changelog 2015-04-12 15:45:36 -07:00
Nick Elser
74e9e3de75 add build status 2015-04-12 15:41:31 -07:00
Nick Elser
75aad64c08 add minitest dependency 2015-04-12 15:35:37 -07:00
Nick Elser
2eb56a8eaa more gemspec updates 2015-04-12 15:34:05 -07:00
Nick Elser
d260160618 make test the default rake command 2015-04-12 15:31:19 -07:00
Nick Elser
57fad16e4b add license 2015-04-12 15:30:12 -07:00
Nick Elser
30639cae72 use msgpack for efficiency 2015-04-12 15:28:53 -07:00
Nick Elser
89061170ea start servers for travis 2015-04-12 15:28:35 -07:00
18 changed files with 710 additions and 439 deletions

38
.github/workflows/CI.yml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: CI
on:
push:
branches:
- master
pull_request:
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
ruby:
- '2.5'
- '2.6'
- '2.7'
- '3.0'
- ruby-head
continue-on-error: ${{ matrix.ruby == 'ruby-head' }}
services:
memcached:
image: memcached
ports:
- 11211:11211
redis:
image: redis
ports:
- 6379:6379
steps:
- uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1
with:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true
- run: |
bundle exec rake

View File

@@ -74,7 +74,7 @@ Style/SpaceInsideBrackets:
Style/AndOr:
Enabled: false
Style/TrailingComma:
Style/TrailingCommaInLiteral:
Enabled: true
Style/SpaceBeforeComma:
@@ -98,7 +98,7 @@ Style/SpaceAfterColon:
Style/SpaceAfterComma:
Enabled: true
Style/SpaceAfterControlKeyword:
Style/SpaceAroundKeyword:
Enabled: true
Style/SpaceAfterNot:
@@ -163,7 +163,7 @@ Style/StringLiterals:
EnforcedStyle: double_quotes
Metrics/CyclomaticComplexity:
Max: 8
Max: 10
Metrics/LineLength:
Max: 128
@@ -214,3 +214,6 @@ Metrics/ParameterLists:
Metrics/PerceivedComplexity:
Enabled: false
Style/Documentation:
Enabled: false

View File

@@ -1,3 +0,0 @@
language: ruby
rvm:
- 2.2.0

View File

@@ -1,3 +1,64 @@
## 0.4.0
- Monotonic clock for locks, avoiding issues with DST (thanks @doits)
- Pooled connection support (thanks @mlarraz)
- Switch to Github actions for tests (thanks @mlarraz)
- Update supported Ruby versions (thanks @mlarraz & @pat)
## 0.3.4
- Support for connection pooling when using memcached locks, via `with` blocks using Dalli (thanks to Lev).
## 0.3.3
- Default TTL for keys to allow for short-lived locking keys (thanks to Ian Remillard) without leaking memory.
- Vastly improve initial lock acquisition, especially on Redis (thanks to Jeremy Wadscak).
## 0.3.2
- Custom lock tokens (thanks to avokhmin).
## 0.3.1
- Slight memory leak fix.
## 0.3.0
- Dramatically simplify the interface by forcing clients to specify the key & resources at lock initialization instead of every method call.
## 0.2.3
- Clarify documentation further with respect to semaphores.
## 0.2.2
- Fix bug with refresh - typo would've prevented real use.
- Clean up code.
- Improve documentation a bit.
- 100% test coverage.
## 0.2.1
- Fix bug when dealing with real-world Redis error conditions.
## 0.2.0
- Refactor class methods into instance methods to simplify implementation.
- Increase thread safety with Memcached implementation.
## 0.1.3
- Properly throw Suo::LockClientError when the connection itself fails (Memcache server not reachable, etc.)
## 0.1.2
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
- Refactor client implementations to re-use more code.
## 0.1.1
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for lock serialization.
## 0.1.0
- First release
- First release.

22
LICENSE.txt Normal file
View File

@@ -0,0 +1,22 @@
Copyright (c) 2015 Nick Elser
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -1,8 +1,8 @@
# Suo
# Suo [![Build Status](https://github.com/nickelser/suo/workflows/CI/badge.svg)](https://github.com/nickelser/suo/actions?query=workflow%3ACI) [![Code Climate](https://codeclimate.com/github/nickelser/suo/badges/gpa.svg)](https://codeclimate.com/github/nickelser/suo) [![Gem Version](https://badge.fury.io/rb/suo.svg)](http://badge.fury.io/rb/suo)
:lock: Distributed semaphores using Memcached or Redis in Ruby.
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis.
Suo provides a very performant distributed lock solution using Compare-And-Set (`CAS`) commands in Memcached, and `WATCH/MULTI` in Redis. It allows locking both single exclusion (like a mutex - sharing one resource), as well as multiple resources.
## Installation
@@ -18,35 +18,75 @@ gem 'suo'
```ruby
# Memcached
suo = Suo::Client::Memcached.new(connection: "127.0.0.1:11211")
suo = Suo::Client::Memcached.new("foo_resource", connection: "127.0.0.1:11211")
# Redis
suo = Suo::Client::Redis.new(connection: {host: "10.0.1.1"})
suo = Suo::Client::Redis.new("baz_resource", connection: {host: "10.0.1.1"})
# Pre-existing client
suo = Suo::Client::Memcached.new(client: some_dalli_client)
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client)
suo.lock("some_key") do
suo.lock do
# critical code here
@puppies.pet!
end
2.times do
Thread.new do
# second argument is the number of resources - so this will run twice
suo.lock("other_key", 2, timeout: 0.5) { puts "Will run twice!" }
# The resources argument is the number of resources the semaphore will allow to lock (defaulting to one - a mutex)
suo = Suo::Client::Memcached.new("bar_resource", client: some_dalli_client, resources: 2)
Thread.new { suo.lock { puts "One"; sleep 2 } }
Thread.new { suo.lock { puts "Two"; sleep 2 } }
Thread.new { suo.lock { puts "Three" } }
# will print "One" "Two", but not "Three", as there are only 2 resources
# custom acquisition timeouts (time to acquire)
suo = Suo::Client::Memcached.new("protected_key", client: some_dalli_client, acquisition_timeout: 1) # in seconds
# manually locking/unlocking
# the return value from lock without a block is a unique token valid only for the current lock
# which must be unlocked manually
token = suo.lock
foo.baz!
suo.unlock(token)
# custom stale lock expiration (cleaning of dead locks)
suo = Suo::Client::Redis.new("other_key", client: some_redis_client, stale_lock_expiration: 60*5)
```
### Stale locks
"Stale locks" - those acquired more than `stale_lock_expiration` (defaulting to 3600 or one hour) ago - are automatically cleared during any operation on the key (`lock`, `unlock`, `refresh`). The `locked?` method will not return true if only stale locks exist, but will not modify the key itself.
To re-acquire a lock in the middle of a block, you can use the refresh method on client.
```ruby
suo = Suo::Client::Redis.new("foo")
# lock is the same token as seen in the manual example, above
suo.lock do |token|
5.times do
baz.bar!
suo.refresh(token)
end
end
```
### Time To Live
```ruby
Suo::Client::Redis.new("bar_resource", ttl: 60) #ttl in seconds
```
A key representing a set of lockable resources is removed once the last resource lock is released and the `ttl` time runs out. When another lock is acquired and the key has been removed the key has to be recreated.
## TODO
- better stale key handling (refresh blocks)
- more race condition tests
- refactor clients to re-use more code
## History
View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md)
View the [changelog](https://github.com/nickelser/suo/blob/master/CHANGELOG.md).
## Contributing

View File

@@ -1,7 +1,8 @@
require "bundler/gem_tasks"
require "rake/testtask"
task default: :test
Rake::TestTask.new do |t|
t.libs << "test"
t.pattern = "test/*_test.rb"
t.pattern = "test/**/*_test.rb"
end

View File

@@ -1,2 +1,15 @@
require "securerandom"
require "monitor"
require "dalli"
require "redis"
require "msgpack"
require "suo/version"
require "suo/clients"
require "suo/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

View File

@@ -2,114 +2,187 @@ module Suo
module Client
class Base
DEFAULT_OPTIONS = {
retry_count: 3,
retry_delay: 0.01,
stale_lock_expiration: 3600
acquisition_timeout: 0.1,
acquisition_delay: 0.01,
stale_lock_expiration: 3600,
resources: 1,
ttl: 60,
}.freeze
def initialize(options = {})
@options = self.class.merge_defaults(options).merge(_initialized: true)
BLANK_STR = "".freeze
attr_accessor :client, :key, :resources, :options
include MonitorMixin
def initialize(key, options = {})
fail "Client required" unless options[:client]
@options = DEFAULT_OPTIONS.merge(options)
@retry_count = (@options[:acquisition_timeout] / @options[:acquisition_delay].to_f).ceil
@client = @options[:client]
@resources = @options[:resources].to_i
@key = key
super() # initialize Monitor mixin for thread safety
end
def lock(key, resources = 1, options = {})
options = self.class.merge_defaults(@options.merge(options))
token = self.class.lock(key, resources, options)
def lock(custom_token = nil)
token = acquire_lock(custom_token)
if token
if block_given? && token
begin
yield if block_given?
yield
ensure
self.class.unlock(key, token, options)
unlock(token)
end
true
else
false
token
end
end
def locked?(key, resources = 1)
self.class.locked?(key, resources, @options)
def locked?
locks.size >= resources
end
class << self
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
def locks
val, _ = get
cleared_locks = deserialize_and_clear_locks(val)
cleared_locks
end
def refresh(token)
retry_with_timeout do
val, cas = get
cas = initial_set if val.nil?
cleared_locks = deserialize_and_clear_locks(val)
refresh_lock(cleared_locks, token)
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
end
end
def locked?(key, resources = 1, options = {})
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
def unlock(token)
return unless token
locks.size >= resources
retry_with_timeout do
val, cas = get
break if val.nil?
cleared_locks = deserialize_and_clear_locks(val)
acquisition_lock = remove_lock(cleared_locks, token)
break unless acquisition_lock
break if set(serialize_locks(cleared_locks), cas, expire: cleared_locks.empty?)
end
rescue LockClientError => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking
end
def locks(key, options)
options = merge_defaults(options)
client = options[:client]
locks = deserialize_locks(client.get(key))
def clear
fail NotImplementedError
end
locks.size
end
private
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
attr_accessor :retry_count
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def acquire_lock(token = nil)
token ||= SecureRandom.base64(16)
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
retry_with_timeout do
val, cas = get
def merge_defaults(options = {})
unless options[:_initialized]
options = self::DEFAULT_OPTIONS.merge(options)
cas = initial_set if val.nil?
fail "Client required" unless options[:client]
end
cleared_locks = deserialize_and_clear_locks(val)
if options[:retry_timeout]
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
end
if cleared_locks.size < resources
add_lock(cleared_locks, token)
options
end
newval = serialize_locks(cleared_locks)
private
def serialize_locks(locks)
locks.map { |time, token| [time.to_f, token].join(":") }.join(",")
end
def deserialize_locks(str)
str.split(",").map do |s|
time, token = s.split(":", 2)
[Time.at(time.to_f), token]
return token if set(newval, cas)
end
end
def clear_expired_locks(locks, options)
expired = Time.now - options[:stale_lock_expiration]
locks.reject { |time, _| time < expired }
end
nil
end
def add_lock(locks, token)
locks << [Time.now.to_f, token]
end
def get
fail NotImplementedError
end
def remove_lock(locks, acquisition_token)
lock = locks.find { |_, token| token == acquisition_token }
locks.delete(lock)
end
def set(newval, cas) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def refresh_lock(locks, acquisition_token)
remove_lock(locks, acquisition_token)
add_lock(locks, token)
def initial_set(val = BLANK_STR) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def synchronize
mon_synchronize { yield }
end
def retry_with_timeout
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
retry_count.times do
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
break if elapsed >= options[:acquisition_timeout]
synchronize do
yield
end
sleep(rand(options[:acquisition_delay] * 1000).to_f / 1000)
end
rescue => _
raise LockClientError
end
def serialize_locks(locks)
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
end
def deserialize_and_clear_locks(val)
clear_expired_locks(deserialize_locks(val))
end
def deserialize_locks(val)
unpacked = (val.nil? || val == BLANK_STR) ? [] : MessagePack.unpack(val)
unpacked.map do |time, token|
[Time.at(time), token]
end
rescue EOFError, MessagePack::MalformedFormatError => _
[]
end
def clear_expired_locks(locks)
expired = Time.now - options[:stale_lock_expiration]
locks.reject { |time, _| time < expired }
end
def add_lock(locks, token, time = Time.now.to_f)
locks << [time, token]
end
def remove_lock(locks, acquisition_token)
lock = locks.find { |_, token| token == acquisition_token }
locks.delete(lock)
end
def refresh_lock(locks, acquisition_token)
remove_lock(locks, acquisition_token)
add_lock(locks, acquisition_token)
end
end
end

View File

@@ -1,7 +0,0 @@
module Suo
module Client
module Errors
class FailedToAcquireLock < StandardError; end
end
end
end

View File

@@ -1,135 +1,34 @@
module Suo
module Client
class Memcached < Base
def initialize(options = {})
def initialize(key, options = {})
options[:client] ||= Dalli::Client.new(options[:connection] || ENV["MEMCACHE_SERVERS"] || "127.0.0.1:11211")
super
end
class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
def clear
@client.with { |client| client.delete(@key) }
end
begin
start = Time.now.to_f
private
options[:retry_count].times do |i|
val, cas = client.get_cas(key)
def get
@client.with { |client| client.get_cas(@key) }
end
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if client.set_cas(key, newval, cas)
acquisition_token = token
break
end
end
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
acquisition_token
def set(newval, cas, expire: false)
if expire
@client.with { |client| client.set_cas(@key, newval, cas, @options[:ttl]) }
else
@client.with { |client| client.set_cas(@key, newval, cas) }
end
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
val, cas = client.get_cas(key)
# much like with initial set - ensure the key is here
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
val, cas = client.get_cas(key)
break if val.nil? # lock has expired totally
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
# another client cleared a token in the interim - try again!
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].delete(key)
def initial_set(val = BLANK_STR)
@client.with do |client|
client.set(@key, val)
_val, cas = client.get_cas(@key)
cas
end
end
end

View File

@@ -1,166 +1,54 @@
module Suo
module Client
class Redis < Base
def initialize(options = {})
OK_STR = "OK".freeze
def initialize(key, options = {})
options[:client] ||= ::Redis.new(options[:connection] || {})
super
end
class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
def clear
with { |r| r.del(@key) }
end
begin
start = Time.now.to_f
private
options[:retry_count].times do
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
acquisition_token = token if ret[0] == "OK"
end
ensure
client.unwatch
end
end
break if acquisition_token
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom
raise boom
raise Suo::Client::FailedToAcquireLock
end
acquisition_token
def with(&block)
if @client.respond_to?(:with)
@client.with(&block)
else
yield @client
end
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
refreshed = false
def get
[with { |r| r.get(@key) }, nil]
end
begin
start = Time.now.to_f
options[:retry_count].times do
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
refreshed = ret[0] == "OK"
ensure
client.unwatch
end
end
break if refreshed
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
def set(newval, _, expire: false)
ret = with do |r|
r.multi do |rr|
if expire
rr.setex(@key, @options[:ttl], newval)
else
rr.set(@key, newval)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
ret && ret[0] == OK_STR
end
return unless acquisition_token
def synchronize
with { |r| r.watch(@key) { yield } }
ensure
with { |r| r.unwatch }
end
begin
start = Time.now.to_f
options[:retry_count].times do
cleared = false
client.watch(key) do
begin
val = client.get(key)
if val.nil?
cleared = true
break
end
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
unless acquisition_lock
# token was already cleared
cleared = true
break
end
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
cleared = ret[0] == "OK"
ensure
client.unwatch
end
end
break if cleared
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {})
options = merge_defaults(options)
options[:client].del(key)
end
def initial_set(val = BLANK_STR)
set(val, nil)
nil
end
end
end

View File

@@ -1,12 +0,0 @@
require "securerandom"
require "monitor"
require "dalli"
require "dalli/cas/client"
require "redis"
require "suo/client/errors"
require "suo/client/base"
require "suo/client/memcached"
require "suo/client/redis"

3
lib/suo/errors.rb Normal file
View File

@@ -0,0 +1,3 @@
module Suo
class LockClientError < StandardError; end
end

View File

@@ -1,3 +1,3 @@
module Suo
VERSION = "0.1.0"
VERSION = "0.4.0".freeze
end

View File

@@ -1,7 +1,7 @@
# coding: utf-8
lib = File.expand_path('../lib', __FILE__)
lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'suo/version'
require "suo/version"
Gem::Specification.new do |spec|
spec.name = "suo"
@@ -9,20 +9,25 @@ Gem::Specification.new do |spec|
spec.authors = ["Nick Elser"]
spec.email = ["nick.elser@gmail.com"]
spec.summary = %q(Distributed semaphores using Memcached or Redis.)
# spec.description = %q{TODO: Long description}
spec.summary = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
spec.description = %q(Distributed locks (mutexes & semaphores) using Memcached or Redis.)
spec.homepage = "https://github.com/nickelser/suo"
spec.license = "MIT"
spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
spec.files = `git ls-files -z`.split("\x0")
spec.bindir = "bin"
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]
spec.required_ruby_version = ">= 2.5"
spec.add_dependency "dalli"
spec.add_dependency "redis"
spec.add_dependency "msgpack"
spec.add_development_dependency "bundler", "~> 1.5"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rubocop", "~> 0.30.0"
spec.add_development_dependency "bundler"
spec.add_development_dependency "rake", "~> 13.0"
spec.add_development_dependency "rubocop", "~> 0.49.0"
spec.add_development_dependency "minitest", "~> 5.5.0"
spec.add_development_dependency "codeclimate-test-reporter", "~> 0.4.7"
end

View File

@@ -3,118 +3,361 @@ require "test_helper"
TEST_KEY = "suo_test_key".freeze
module ClientTests
def test_requires_client
exception = assert_raises(RuntimeError) do
@klass.lock(TEST_KEY, 1)
end
assert_equal "Client required", exception.message
def client(options = {})
@client.class.new(options[:key] || TEST_KEY, options.merge(client: @client.client))
end
def test_class_single_resource_locking
lock1 = @klass.lock(TEST_KEY, 1, client: @klass_client)
def test_throws_failed_error_on_bad_client
assert_raises(Suo::LockClientError) do
client = @client.class.new(TEST_KEY, client: {})
client.lock
end
end
def test_single_resource_locking
lock1 = @client.lock
refute_nil lock1
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
locked = @client.locked?
assert_equal true, locked
lock2 = @klass.lock(TEST_KEY, 1, client: @klass_client)
lock2 = @client.lock
assert_nil lock2
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
@client.unlock(lock1)
locked = @client.locked?
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked
end
def test_class_multiple_resource_locking
lock1 = @klass.lock(TEST_KEY, 2, client: @klass_client)
def test_lock_with_custom_token
token = 'foo-bar'
lock = @client.lock token
assert_equal lock, token
end
def test_empty_lock_on_invalid_data
@client.send(:initial_set, "bad value")
assert_equal false, @client.locked?
end
def test_clear
lock1 = @client.lock
refute_nil lock1
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
assert_equal false, locked
@client.clear
lock2 = @klass.lock(TEST_KEY, 2, client: @klass_client)
assert_equal false, @client.locked?
end
def test_multiple_resource_locking
@client = client(resources: 2)
lock1 = @client.lock
refute_nil lock1
assert_equal false, @client.locked?
lock2 = @client.lock
refute_nil lock2
locked = @klass.locked?(TEST_KEY, 2, client: @klass_client)
assert_equal true, locked
assert_equal true, @client.locked?
@klass.unlock(TEST_KEY, lock1, client: @klass_client)
@client.unlock(lock1)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal true, locked
assert_equal false, @client.locked?
@klass.unlock(TEST_KEY, lock2, client: @klass_client)
assert_equal 1, @client.locks.size
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked
@client.unlock(lock2)
assert_equal false, @client.locked?
assert_equal 0, @client.locks.size
end
def test_instance_single_resource_locking
def test_block_single_resource_locking
locked = false
@client.lock(TEST_KEY, 1) { locked = true }
@client.lock { locked = true }
assert_equal true, locked
end
def test_instance_unlocks_on_exception
def test_block_unlocks_on_exception
assert_raises(RuntimeError) do
@client.lock(TEST_KEY, 1) { fail "Test" }
@client.lock{ fail "Test" }
end
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked
assert_equal false, @client.locked?
end
def test_instance_multiple_resource_locking
def test_readme_example
output = Queue.new
@client = client(resources: 2)
threads = []
threads << Thread.new { @client.lock { output << "One"; sleep 0.5 } }
threads << Thread.new { @client.lock { output << "Two"; sleep 0.5 } }
sleep 0.1
threads << Thread.new { @client.lock { output << "Three" } }
threads.each(&:join)
ret = []
ret << (output.size > 0 ? output.pop : nil)
ret << (output.size > 0 ? output.pop : nil)
ret.sort!
assert_equal 0, output.size
assert_equal %w(One Two), ret
assert_equal false, @client.locked?
end
def test_block_multiple_resource_locking
success_counter = Queue.new
failure_counter = Queue.new
@client = client(acquisition_timeout: 0.9, resources: 50)
100.times.map do |i|
Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do
sleep(1)
success = @client.lock do
sleep(3)
success_counter << i
end
failure_counter << i unless success
end
end.map(&:join)
end.each(&:join)
assert_equal 50, success_counter.size
assert_equal 50, failure_counter.size
assert_equal false, @client.locked?
end
def test_instance_multiple_resource_locking_longer_timeout
def test_block_multiple_resource_locking_longer_timeout
success_counter = Queue.new
failure_counter = Queue.new
@client = client(acquisition_timeout: 3, resources: 50)
100.times.map do |i|
Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do
sleep(1)
success = @client.lock do
sleep(0.5)
success_counter << i
end
failure_counter << i unless success
end
end.map(&:join)
end.each(&:join)
assert_equal 100, success_counter.size
assert_equal 0, failure_counter.size
assert_equal false, @client.locked?
end
def test_unstale_lock_acquisition
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5)
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
sleep 0.3
t2 = Thread.new do
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2].each(&:join)
assert_equal 1, success_counter.size
assert_equal 1, failure_counter.size
assert_equal false, @client.locked?
end
def test_stale_lock_acquisition
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5)
t1 = Thread.new { @client.lock { sleep 0.6; success_counter << 1 } }
sleep 0.55
t2 = Thread.new do
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2].each(&:join)
assert_equal 2, success_counter.size
assert_equal 0, failure_counter.size
assert_equal false, @client.locked?
end
def test_refresh
@client = client(stale_lock_expiration: 0.5)
lock1 = @client.lock
assert_equal true, @client.locked?
@client.refresh(lock1)
assert_equal true, @client.locked?
sleep 0.55
assert_equal false, @client.locked?
lock2 = @client.lock
@client.refresh(lock1)
assert_equal true, @client.locked?
@client.unlock(lock1)
# edge case with refresh lock in the middle
assert_equal true, @client.locked?
@client.clear
assert_equal false, @client.locked?
@client.refresh(lock2)
assert_equal true, @client.locked?
@client.unlock(lock2)
# now finally unlocked
assert_equal false, @client.locked?
end
def test_block_refresh
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5)
t1 = Thread.new do
@client.lock do |token|
sleep 0.6
@client.refresh(token)
sleep 1
success_counter << 1
end
end
t2 = Thread.new do
sleep 0.8
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2].each(&:join)
assert_equal 1, success_counter.size
assert_equal 1, failure_counter.size
assert_equal false, @client.locked?
end
def test_refresh_multi
success_counter = Queue.new
failure_counter = Queue.new
@client = client(stale_lock_expiration: 0.5, resources: 2)
t1 = Thread.new do
@client.lock do |token|
sleep 0.4
@client.refresh(token)
success_counter << 1
sleep 0.5
end
end
t2 = Thread.new do
sleep 0.55
locked = @client.lock do
success_counter << 1
sleep 0.5
end
failure_counter << 1 unless locked
end
t3 = Thread.new do
sleep 0.75
locked = @client.lock { success_counter << 1 }
failure_counter << 1 unless locked
end
[t1, t2, t3].each(&:join)
assert_equal 2, success_counter.size
assert_equal 1, failure_counter.size
assert_equal false, @client.locked?
end
def test_increment_reused_client
i = 0
threads = 2.times.map do
Thread.new do
@client.lock { i += 1 }
end
end
threads.each(&:join)
assert_equal 2, i
assert_equal false, @client.locked?
end
def test_increment_new_client
i = 0
threads = 2.times.map do
Thread.new do
# note this is the method that generates a *new* client
client.lock { i += 1 }
end
end
threads.each(&:join)
assert_equal 2, i
assert_equal false, @client.locked?
end
end
class TestBaseClient < Minitest::Test
def setup
@klass = Suo::Client::Base
@client = Suo::Client::Base.new(TEST_KEY, client: {})
end
def test_not_implemented
assert_raises(NotImplementedError) do
@klass.lock(TEST_KEY, 1)
@client.send(:get)
end
assert_raises(NotImplementedError) do
@client.send(:set, "", "")
end
assert_raises(NotImplementedError) do
@client.send(:initial_set)
end
assert_raises(NotImplementedError) do
@client.send(:clear)
end
end
end
@@ -123,13 +366,13 @@ class TestMemcachedClient < Minitest::Test
include ClientTests
def setup
@klass = Suo::Client::Memcached
@client = @klass.new
@klass_client = Dalli::Client.new("127.0.0.1:11211")
@dalli = Dalli::Client.new("127.0.0.1:11211")
@client = Suo::Client::Memcached.new(TEST_KEY)
teardown
end
def teardown
@klass_client.delete(TEST_KEY)
@dalli.delete(TEST_KEY)
end
end
@@ -137,13 +380,13 @@ class TestRedisClient < Minitest::Test
include ClientTests
def setup
@klass = Suo::Client::Redis
@client = @klass.new
@klass_client = Redis.new
@redis = Redis.new
@client = Suo::Client::Redis.new(TEST_KEY)
teardown
end
def teardown
@klass_client.del(TEST_KEY)
@redis.del(TEST_KEY)
end
end

View File

@@ -1,9 +1,13 @@
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
if ENV["CODECLIMATE_REPO_TOKEN"]
require "codeclimate-test-reporter"
CodeClimate::TestReporter.start
end
require "suo"
require "thread"
require "minitest/autorun"
require "minitest/benchmark"
ENV["SUO_TEST"] = "true"