Synchronous Blocking Of Multiple Resources
Abstract situation. We have 2 sheeps we can asynchronously use at time (Semaphore(2)) and 1 gate we can use at time. We want to spend sheep through gate 2 times (each time we need
Solution 1:
You could implement an asynchronous context manager that handles multiple locks. This object should make sure it doesn't hold any lock while waiting for another non-available lock:
class multilock(asyncio.locks._ContextManagerMixin):
def __init__(self, *locks):
self.released = list(locks)
self.acquired = []
async def acquire(self):
while self.released:
lock = self.released.pop()
if lock.locked():
self.release()
await lock.acquire()
self.acquired.append(lock)
def release(self):
while self.acquired:
lock = self.acquired.pop()
lock.release()
self.released.append(lock)
Example:
async def test(lock1, lock2):
async with multilock(lock1, lock2):
print('Do something')
Solution 2:
Based on this solution I created solution for this example. We need two things:
add
locked()
function toSheep
andGate
, that's checking if object can be acquired right nowadd and use new
MultiAcquire
task that would acquire objects only if it all can be acquired right now (and suspend for release event otherwise)
Here's final code, see MultiAcquire
- it's main:
import asyncio
class Sheep:
_sem = asyncio.Semaphore(2) # we have 2 avaliable sheeps at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire sheep ({})'.format(self._reason))
def release(self):
print('release sheep ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class Gate:
_sem = asyncio.Semaphore(1) # we have 1 avaliable gate at time
def __init__(self, reason):
self._reason = reason
async def acquire(self):
await type(self)._sem.acquire()
print('acquire gate ({})'.format(self._reason))
def release(self):
print('release gate ({})'.format(self._reason))
type(self)._sem.release()
def locked(self):
return type(self)._sem.locked()
class MultiAcquire(asyncio.Task):
_check_lock = asyncio.Lock() # to suspend for creating task that acquires objects
_release_event = asyncio.Event() # to suspend for any object was released
def __init__(self, locks):
super().__init__(self._task_coro())
self._locks = locks
# Here we use decorator to subscribe all release() calls,
# _release_event would be set in this case:
for l in self._locks:
l.release = self._notify(l.release)
async def _task_coro(self):
while True:
# Create task to acquire all locks and break on success:
async with type(self)._check_lock:
if not any(l.locked() for l in self._locks): # task would be created only if all objects can be acquired
task = asyncio.gather(*[l.acquire() for l in self._locks]) # create task to acquire all objects
await asyncio.sleep(0) # start task without waiting for it
break
# Wait for any release() to try again:
await type(self)._release_event.wait()
# Wait for task:
return await task
def _notify(self, func):
def wrapper(*args, **kwargs):
type(self)._release_event.set()
type(self)._release_event.clear()
return func(*args, **kwargs)
return wrapper
async def spend(reason):
sheep = Sheep(reason)
gate = Gate(reason)
await MultiAcquire([sheep, gate]) # block 1 sheep, 1 gate
await asyncio.sleep(1) # 1 second
print('Spend sheep through a gate')
sheep.release()
gate.release()
async def feed(reason):
sheep = Sheep(reason)
await MultiAcquire([sheep]) # block 1 sheep
await asyncio.sleep(2) # 2 seconds
print('Feed sheep')
sheep.release()
async def main():
await asyncio.gather(
spend('spend 1'),
feed('feed 1'),
spend('spend 2')
) # spend 2 times, feed 1 time
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Output:
acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]
Post a Comment for "Synchronous Blocking Of Multiple Resources"