17 Commits

Author SHA1 Message Date
Nick Elser
7662743123 update changelog 2015-04-12 19:47:35 -07:00
Nick Elser
a2fa281b86 fix test cases when memcached is delayed 2015-04-12 19:47:23 -07:00
Nick Elser
887219b63d release 0.1.2 2015-04-12 19:43:59 -07:00
Nick Elser
c0905fef91 faster unpack without exceptions 2015-04-12 19:43:07 -07:00
Nick Elser
a54b795e20 simplify defaults logic and fix retry timeouts 2015-04-12 19:42:59 -07:00
Nick Elser
0828cd546b add gem info 2015-04-12 17:54:38 -07:00
Nick Elser
c2b9de4cf3 refactor shared logic into base class 2015-04-12 17:54:30 -07:00
Nick Elser
d4860423fa whitespace 2015-04-12 16:03:53 -07:00
Nick Elser
d8f8350d1c check timeout at the entry of the loop 2015-04-12 15:57:32 -07:00
Nick Elser
1668756a48 update changelog 2015-04-12 15:45:36 -07:00
Nick Elser
74e9e3de75 add build status 2015-04-12 15:41:31 -07:00
Nick Elser
75aad64c08 add minitest dependency 2015-04-12 15:35:37 -07:00
Nick Elser
2eb56a8eaa more gemspec updates 2015-04-12 15:34:05 -07:00
Nick Elser
d260160618 make test the default rake command 2015-04-12 15:31:19 -07:00
Nick Elser
57fad16e4b add license 2015-04-12 15:30:12 -07:00
Nick Elser
30639cae72 use msgpack for efficiency 2015-04-12 15:28:53 -07:00
Nick Elser
89061170ea start servers for travis 2015-04-12 15:28:35 -07:00
12 changed files with 215 additions and 313 deletions

View File

@@ -1,3 +1,6 @@
language: ruby language: ruby
rvm: rvm:
- 2.2.0 - 2.2.0
services:
- memcached
- redis-server

View File

@@ -1,3 +1,12 @@
## 0.1.2
- Fix retry_timeout to properly use the full time (was being calculated incorrectly).
- Refactor client implementations to re-use more code.
## 0.1.1
- Use [MessagePack](https://github.com/msgpack/msgpack-ruby) for semaphore serialization.
## 0.1.0 ## 0.1.0
- First release - First release.

22
LICENSE.txt Normal file
View File

@@ -0,0 +1,22 @@
Copyright (c) 2015 Nick Elser
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -1,4 +1,4 @@
# Suo # Suo [![Build Status](https://travis-ci.org/nickelser/suo.svg?branch=master)](https://travis-ci.org/nickelser/suo) [![Gem Version](https://badge.fury.io/rb/suo.svg)](http://badge.fury.io/rb/suo)
:lock: Distributed semaphores using Memcached or Redis in Ruby. :lock: Distributed semaphores using Memcached or Redis in Ruby.
@@ -42,7 +42,6 @@ end
## TODO ## TODO
- better stale key handling (refresh blocks) - better stale key handling (refresh blocks)
- more race condition tests - more race condition tests
- refactor clients to re-use more code
## History ## History

View File

@@ -1,7 +1,8 @@
require "bundler/gem_tasks" require "bundler/gem_tasks"
require "rake/testtask" require "rake/testtask"
task default: :test
Rake::TestTask.new do |t| Rake::TestTask.new do |t|
t.libs << "test" t.libs << "test"
t.pattern = "test/*_test.rb" t.pattern = "test/**/*_test.rb"
end end

View File

@@ -2,13 +2,13 @@ module Suo
module Client module Client
class Base class Base
DEFAULT_OPTIONS = { DEFAULT_OPTIONS = {
retry_count: 3, retry_timeout: 0.1,
retry_delay: 0.01, retry_delay: 0.01,
stale_lock_expiration: 3600 stale_lock_expiration: 3600
}.freeze }.freeze
def initialize(options = {}) def initialize(options = {})
@options = self.class.merge_defaults(options).merge(_initialized: true) @options = self.class.merge_defaults(options)
end end
def lock(key, resources = 1, options = {}) def lock(key, resources = 1, options = {})
@@ -33,32 +33,86 @@ module Suo
end end
class << self class << self
def lock(key, resources = 1, options = {}) # rubocop:disable Lint/UnusedMethodArgument def lock(key, resources = 1, options = {})
fail NotImplementedError options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
retry_with_timeout(key, options) do
val, cas = get(key, options)
if val.nil?
set_initial(key, options)
next
end
locks = deserialize_and_clear_locks(val, options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if set(key, newval, cas, options)
acquisition_token = token
break
end
end
end
acquisition_token
end end
def locked?(key, resources = 1, options = {}) def locked?(key, resources = 1, options = {})
options = merge_defaults(options) locks(key, options).size >= resources
client = options[:client]
locks = deserialize_locks(client.get(key))
locks.size >= resources
end end
def locks(key, options) def locks(key, options)
options = merge_defaults(options) options = merge_defaults(options)
client = options[:client] val, _ = get(key, options)
locks = deserialize_locks(client.get(key)) locks = deserialize_locks(val)
locks.size locks
end end
def refresh(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument def refresh(key, acquisition_token, options = {})
fail NotImplementedError options = merge_defaults(options)
retry_with_timeout(key, options) do
val, cas = get(key, options)
if val.nil?
set_initial(key, options)
next
end
locks = deserialize_and_clear_locks(val, options)
refresh_lock(locks, acquisition_token)
break if set(key, serialize_locks(locks), cas, options)
end
end end
def unlock(key, acquisition_token, options = {}) # rubocop:disable Lint/UnusedMethodArgument def unlock(key, acquisition_token, options = {})
fail NotImplementedError options = merge_defaults(options)
return unless acquisition_token
retry_with_timeout(key, options) do
val, cas = get(key, options)
break if val.nil?
locks = deserialize_and_clear_locks(val, options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
break if set(key, serialize_locks(locks), cas, options)
end
rescue FailedToAcquireLock => _ # rubocop:disable Lint/HandleExceptions
# ignore - assume success due to optimistic locking
end end
def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument def clear(key, options = {}) # rubocop:disable Lint/UnusedMethodArgument
@@ -66,30 +120,68 @@ module Suo
end end
def merge_defaults(options = {}) def merge_defaults(options = {})
unless options[:_initialized] options = self::DEFAULT_OPTIONS.merge(options)
options = self::DEFAULT_OPTIONS.merge(options)
fail "Client required" unless options[:client] fail "Client required" unless options[:client]
end
if options[:retry_timeout] options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).ceil
options[:retry_count] = (options[:retry_timeout] / options[:retry_delay].to_f).floor
end
options options
end end
private private
def serialize_locks(locks) def get(key, options) # rubocop:disable Lint/UnusedMethodArgument
locks.map { |time, token| [time.to_f, token].join(":") }.join(",") fail NotImplementedError
end end
def deserialize_locks(str) def set(key, newval, oldval, options) # rubocop:disable Lint/UnusedMethodArgument
str.split(",").map do |s| fail NotImplementedError
time, token = s.split(":", 2) end
[Time.at(time.to_f), token]
def set_initial(key, options) # rubocop:disable Lint/UnusedMethodArgument
fail NotImplementedError
end
def synchronize(key, options)
yield(key, options)
end
def retry_with_timeout(key, options)
start = Time.now.to_f
options[:retry_count].times do
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
synchronize(key, options) do
yield
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end end
rescue => _
raise FailedToAcquireLock
end
def serialize_locks(locks)
MessagePack.pack(locks.map { |time, token| [time.to_f, token] })
end
def deserialize_and_clear_locks(val, options)
clear_expired_locks(deserialize_locks(val), options)
end
def deserialize_locks(val)
unpacked = (val.nil? || val == "") ? [] : MessagePack.unpack(val)
unpacked.map do |time, token|
[Time.at(time), token]
end
rescue EOFError => _
[]
end end
def clear_expired_locks(locks, options) def clear_expired_locks(locks, options)

View File

@@ -7,130 +7,24 @@ module Suo
end end
class << self class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do |i|
val, cas = client.get_cas(key)
# no key has been set yet; we could simply set it, but would lead to race conditions on the initial setting
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
if client.set_cas(key, newval, cas)
acquisition_token = token
break
end
end
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
val, cas = client.get_cas(key)
# much like with initial set - ensure the key is here
if val.nil?
client.set(key, "")
next
end
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
val, cas = client.get_cas(key)
break if val.nil? # lock has expired totally
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
break unless acquisition_lock
newval = serialize_locks(locks)
break if client.set_cas(key, newval, cas)
# another client cleared a token in the interim - try again!
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {}) def clear(key, options = {})
options = merge_defaults(options) options = merge_defaults(options)
options[:client].delete(key) options[:client].delete(key)
end end
private
def get(key, options)
options[:client].get_cas(key)
end
def set(key, newval, cas, options)
options[:client].set_cas(key, newval, cas)
end
def set_initial(key, options)
options[:client].set(key, "")
end
end end
end end
end end

View File

@@ -7,160 +7,36 @@ module Suo
end end
class << self class << self
def lock(key, resources = 1, options = {})
options = merge_defaults(options)
acquisition_token = nil
token = SecureRandom.base64(16)
client = options[:client]
begin
start = Time.now.to_f
options[:retry_count].times do
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val.to_s), options)
if locks.size < resources
add_lock(locks, token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
acquisition_token = token if ret[0] == "OK"
end
ensure
client.unwatch
end
end
break if acquisition_token
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom
raise boom
raise Suo::Client::FailedToAcquireLock
end
acquisition_token
end
def refresh(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
refreshed = false
begin
start = Time.now.to_f
options[:retry_count].times do
client.watch(key) do
begin
val = client.get(key)
locks = clear_expired_locks(deserialize_locks(val), options)
refresh_lock(locks, acquisition_token)
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
refreshed = ret[0] == "OK"
ensure
client.unwatch
end
end
break if refreshed
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => _
raise Suo::Client::FailedToAcquireLock
end
end
def unlock(key, acquisition_token, options = {})
options = merge_defaults(options)
client = options[:client]
return unless acquisition_token
begin
start = Time.now.to_f
options[:retry_count].times do
cleared = false
client.watch(key) do
begin
val = client.get(key)
if val.nil?
cleared = true
break
end
locks = clear_expired_locks(deserialize_locks(val), options)
acquisition_lock = remove_lock(locks, acquisition_token)
unless acquisition_lock
# token was already cleared
cleared = true
break
end
newval = serialize_locks(locks)
ret = client.multi do |multi|
multi.set(key, newval)
end
cleared = ret[0] == "OK"
ensure
client.unwatch
end
end
break if cleared
if options[:retry_timeout]
now = Time.now.to_f
break if now - start > options[:retry_timeout]
end
sleep(rand(options[:retry_delay] * 1000).to_f / 1000)
end
rescue => boom # rubocop:disable Lint/HandleExceptions
# since it's optimistic locking - fine if we are unable to release
raise boom if ENV["SUO_TEST"]
end
end
def clear(key, options = {}) def clear(key, options = {})
options = merge_defaults(options) options = merge_defaults(options)
options[:client].del(key) options[:client].del(key)
end end
private
def get(key, options)
[options[:client].get(key), nil]
end
def set(key, newval, _, options)
ret = options[:client].multi do |multi|
multi.set(key, newval)
end
ret[0] == "OK"
end
def synchronize(key, options)
options[:client].watch(key) do
yield
end
ensure
options[:client].unwatch
end
def set_initial(key, options)
options[:client].set(key, "")
end
end end
end end
end end

View File

@@ -6,6 +6,8 @@ require "dalli/cas/client"
require "redis" require "redis"
require "msgpack"
require "suo/client/errors" require "suo/client/errors"
require "suo/client/base" require "suo/client/base"
require "suo/client/memcached" require "suo/client/memcached"

View File

@@ -1,3 +1,3 @@
module Suo module Suo
VERSION = "0.1.0" VERSION = "0.1.2"
end end

View File

@@ -1,7 +1,7 @@
# coding: utf-8 # coding: utf-8
lib = File.expand_path('../lib', __FILE__) lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'suo/version' require "suo/version"
Gem::Specification.new do |spec| Gem::Specification.new do |spec|
spec.name = "suo" spec.name = "suo"
@@ -10,12 +10,14 @@ Gem::Specification.new do |spec|
spec.email = ["nick.elser@gmail.com"] spec.email = ["nick.elser@gmail.com"]
spec.summary = %q(Distributed semaphores using Memcached or Redis.) spec.summary = %q(Distributed semaphores using Memcached or Redis.)
# spec.description = %q{TODO: Long description} spec.description = %q(Distributed semaphores using Memcached or Redis.)
spec.homepage = "https://github.com/nickelser/suo" spec.homepage = "https://github.com/nickelser/suo"
spec.license = "MIT"
spec.files = `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) } spec.files = `git ls-files -z`.split("\x0")
spec.bindir = "bin" spec.bindir = "bin"
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"] spec.require_paths = ["lib"]
spec.add_dependency "dalli" spec.add_dependency "dalli"
@@ -25,4 +27,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "bundler", "~> 1.5" spec.add_development_dependency "bundler", "~> 1.5"
spec.add_development_dependency "rake", "~> 10.0" spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rubocop", "~> 0.30.0" spec.add_development_dependency "rubocop", "~> 0.30.0"
spec.add_development_dependency "minitest", "~> 5.5.0"
end end

View File

@@ -24,6 +24,7 @@ module ClientTests
@klass.unlock(TEST_KEY, lock1, client: @klass_client) @klass.unlock(TEST_KEY, lock1, client: @klass_client)
locked = @klass.locked?(TEST_KEY, 1, client: @klass_client) locked = @klass.locked?(TEST_KEY, 1, client: @klass_client)
assert_equal false, locked assert_equal false, locked
end end
@@ -74,8 +75,8 @@ module ClientTests
100.times.map do |i| 100.times.map do |i|
Thread.new do Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 0.9) do success = @client.lock(TEST_KEY, 50, retry_timeout: 0.5) do
sleep(1) sleep(2)
success_counter << i success_counter << i
end end
@@ -94,7 +95,7 @@ module ClientTests
100.times.map do |i| 100.times.map do |i|
Thread.new do Thread.new do
success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do success = @client.lock(TEST_KEY, 50, retry_timeout: 2) do
sleep(1) sleep(0.5)
success_counter << i success_counter << i
end end
@@ -114,7 +115,7 @@ class TestBaseClient < Minitest::Test
def test_not_implemented def test_not_implemented
assert_raises(NotImplementedError) do assert_raises(NotImplementedError) do
@klass.lock(TEST_KEY, 1) @klass.send(:get, TEST_KEY, {})
end end
end end
end end