Class: Concurrent::ReentrantReadWriteLock

Inherits:
Synchronization::Object show all
Defined in:
lib/concurrent/atomic/reentrant_read_write_lock.rb

Overview

Re-entrant read-write lock implementation

Allows any number of concurrent readers, but only one concurrent writer (And while the "write" lock is taken, no read locks can be obtained either. Hence, the write lock can also be called an "exclusive" lock.)

If another thread has taken a read lock, any thread which wants a write lock will block until all the readers release their locks. However, once a thread starts waiting to obtain a write lock, any additional readers that come along will also wait (so writers are not starved).

A thread can acquire both a read and write lock at the same time. A thread can also acquire a read lock OR a write lock more than once. Only when the read (or write) lock is released as many times as it was acquired, will the thread actually let it go, allowing other threads which might have been waiting to proceed. Therefore the lock can be upgraded by first acquiring read lock and then write lock and that the lock can be downgraded by first having both read and write lock a releasing just the write lock.

If both read and write locks are acquired by the same thread, it is not strictly necessary to release them in the same order they were acquired. In other words, the following code is legal:

This implementation was inspired by java.util.concurrent.ReentrantReadWriteLock.

Examples:

lock = Concurrent::ReentrantReadWriteLock.new
lock.acquire_write_lock
lock.acquire_read_lock
lock.release_write_lock
# At this point, the current thread is holding only a read lock, not a write
# lock. So other threads can take read locks, but not a write lock.
lock.release_read_lock
# Now the current thread is not holding either a read or write lock, so
# another thread could potentially acquire a write lock.
lock = Concurrent::ReentrantReadWriteLock.new
lock.with_read_lock  { data.retrieve }
lock.with_write_lock { data.modify! }

See Also:

Instance Method Summary collapse

Constructor Details

#initializeReentrantReadWriteLock

Create a new ReentrantReadWriteLock in the unlocked state.



107
108
109
110
111
112
113
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 107

def initialize
  super()
  @Counter    = AtomicFixnum.new(0)       # single integer which represents lock state
  @ReadQueue  = Synchronization::Lock.new # used to queue waiting readers
  @WriteQueue = Synchronization::Lock.new # used to queue waiting writers
  @HeldCount  = ThreadLocalVar.new(0)     # indicates # of R & W locks held by this thread
end

Instance Method Details

#acquire_read_lockBoolean

Acquire a read lock. If a write lock is held by another thread, will block until it is released.

Returns:

  • (Boolean)

    true if the lock is successfully acquired

Raises:



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 160

def acquire_read_lock
  if (held = @HeldCount.value) > 0
    # If we already have a lock, there's no need to wait
    if held & READ_LOCK_MASK == 0
      # But we do need to update the counter, if we were holding a write
      #   lock but not a read lock
      @Counter.update { |c| c + 1 }
    end
    @HeldCount.value = held + 1
    return true
  end

  while true
    c = @Counter.value
    raise ResourceLimitError.new('Too many reader threads') if max_readers?(c)

    # If a writer is waiting OR running when we first queue up, we need to wait
    if waiting_or_running_writer?(c)
      # Before going to sleep, check again with the ReadQueue mutex held
      @ReadQueue.synchronize do
        @ReadQueue.ns_wait if waiting_or_running_writer?
      end
      # Note: the above 'synchronize' block could have used #wait_until,
      #   but that waits repeatedly in a loop, checking the wait condition
      #   each time it wakes up (to protect against spurious wakeups)
      # But we are already in a loop, which is only broken when we successfully
      #   acquire the lock! So we don't care about spurious wakeups, and would
      #   rather not pay the extra overhead of using #wait_until

      # After a reader has waited once, they are allowed to "barge" ahead of waiting writers
      # But if a writer is *running*, the reader still needs to wait (naturally)
      while true
        c = @Counter.value
        if running_writer?(c)
          @ReadQueue.synchronize do
            @ReadQueue.ns_wait if running_writer?
          end
        elsif @Counter.compare_and_set(c, c+1)
          @HeldCount.value = held + 1
          return true
        end
      end
    elsif @Counter.compare_and_set(c, c+1)
      @HeldCount.value = held + 1
      return true
    end
  end
end

#acquire_write_lockBoolean

Acquire a write lock. Will block and wait for all active readers and writers.

Returns:

  • (Boolean)

    true if the lock is successfully acquired

Raises:



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 255

def acquire_write_lock
  if (held = @HeldCount.value) >= WRITE_LOCK_HELD
    # if we already have a write (exclusive) lock, there's no need to wait
    @HeldCount.value = held + WRITE_LOCK_HELD
    return true
  end

  while true
    c = @Counter.value
    raise ResourceLimitError.new('Too many writer threads') if max_writers?(c)

    # To go ahead and take the lock without waiting, there must be no writer
    #   running right now, AND no writers who came before us still waiting to
    #   acquire the lock
    # Additionally, if any read locks have been taken, we must hold all of them
    if c == held
      # If we successfully swap the RUNNING_WRITER bit on, then we can go ahead
      if @Counter.compare_and_set(c, c+RUNNING_WRITER)
        @HeldCount.value = held + WRITE_LOCK_HELD
        return true
      end
    elsif @Counter.compare_and_set(c, c+WAITING_WRITER)
      while true
        # Now we have successfully incremented, so no more readers will be able to increment
        #   (they will wait instead)
        # However, readers OR writers could decrement right here
        @WriteQueue.synchronize do
          # So we have to do another check inside the synchronized section
          # If a writer OR another reader is running, then go to sleep
          c = @Counter.value
          @WriteQueue.ns_wait if running_writer?(c) || running_readers(c) != held
        end
        # Note: if you are thinking of replacing the above 'synchronize' block
        # with #wait_until, read the comment in #acquire_read_lock first!

        # We just came out of a wait
        # If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
        #   then we are OK to stop waiting and go ahead
        # Otherwise go back and wait again
        c = @Counter.value
        if !running_writer?(c) &&
           running_readers(c) == held &&
           @Counter.compare_and_set(c, c+RUNNING_WRITER-WAITING_WRITER)
          @HeldCount.value = held + WRITE_LOCK_HELD
          return true
        end
      end
    end
  end
end

#release_read_lockBoolean

Release a previously acquired read lock.

Returns:

  • (Boolean)

    true if the lock is successfully released



234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 234

def release_read_lock
  held = @HeldCount.value = @HeldCount.value - 1
  rlocks_held = held & READ_LOCK_MASK
  if rlocks_held == 0
    c = @Counter.update { |counter| counter - 1 }
    # If one or more writers were waiting, and we were the last reader, wake a writer up
    if waiting_or_running_writer?(c) && running_readers(c) == 0
      @WriteQueue.signal
    end
  elsif rlocks_held == READ_LOCK_MASK
    raise IllegalOperationError, "Cannot release a read lock which is not held"
  end
  true
end

#release_write_lockBoolean

Release a previously acquired write lock.

Returns:

  • (Boolean)

    true if the lock is successfully released



329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 329

def release_write_lock
  held = @HeldCount.value = @HeldCount.value - WRITE_LOCK_HELD
  wlocks_held = held & WRITE_LOCK_MASK
  if wlocks_held == 0
    c = @Counter.update { |counter| counter - RUNNING_WRITER }
    @ReadQueue.broadcast
    @WriteQueue.signal if waiting_writers(c) > 0
  elsif wlocks_held == WRITE_LOCK_MASK
    raise IllegalOperationError, "Cannot release a write lock which is not held"
  end
  true
end

#try_read_lockBoolean

Try to acquire a read lock and return true if we succeed. If it cannot be acquired immediately, return false.

Returns:

  • (Boolean)

    true if the lock is successfully acquired



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 213

def try_read_lock
  if (held = @HeldCount.value) > 0
    if held & READ_LOCK_MASK == 0
      # If we hold a write lock, but not a read lock...
      @Counter.update { |c| c + 1 }
    end
    @HeldCount.value = held + 1
    return true
  else
    c = @Counter.value
    if !waiting_or_running_writer?(c) && @Counter.compare_and_set(c, c+1)
      @HeldCount.value = held + 1
      return true
    end
  end
  false
end

#try_write_lockBoolean

Try to acquire a write lock and return true if we succeed. If it cannot be acquired immediately, return false.

Returns:

  • (Boolean)

    true if the lock is successfully acquired



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 310

def try_write_lock
  if (held = @HeldCount.value) >= WRITE_LOCK_HELD
    @HeldCount.value = held + WRITE_LOCK_HELD
    return true
  else
    c = @Counter.value
    if !waiting_or_running_writer?(c) &&
       running_readers(c) == held &&
       @Counter.compare_and_set(c, c+RUNNING_WRITER)
       @HeldCount.value = held + WRITE_LOCK_HELD
      return true
    end
  end
  false
end

#with_read_lock { ... } ⇒ Object

Execute a block operation within a read lock.

Yields:

  • the task to be performed within the lock.

Returns:

  • (Object)

    the result of the block operation.

Raises:



124
125
126
127
128
129
130
131
132
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 124

def with_read_lock
  raise ArgumentError.new('no block given') unless block_given?
  acquire_read_lock
  begin
    yield
  ensure
    release_read_lock
  end
end

#with_write_lock { ... } ⇒ Object

Execute a block operation within a write lock.

Yields:

  • the task to be performed within the lock.

Returns:

  • (Object)

    the result of the block operation.

Raises:



143
144
145
146
147
148
149
150
151
# File 'lib/concurrent/atomic/reentrant_read_write_lock.rb', line 143

def with_write_lock
  raise ArgumentError.new('no block given') unless block_given?
  acquire_write_lock
  begin
    yield
  ensure
    release_write_lock
  end
end