Skip to content Skip to sidebar Skip to footer

Synchronous Blocking Of Multiple Resources

Abstract situation. We have 2 sheeps we can asynchronously use at time (Semaphore(2)) and 1 gate we can use at time. We want to spend sheep through gate 2 times (each time we need

Solution 1:

You could implement an asynchronous context manager that handles multiple locks. This object should make sure it doesn't hold any lock while waiting for another non-available lock:

class multilock(asyncio.locks._ContextManagerMixin):

    def __init__(self, *locks):
        self.released = list(locks)
        self.acquired = []

    async def acquire(self):
        while self.released:
            lock = self.released.pop()
            if lock.locked():
                self.release()
            await lock.acquire()
            self.acquired.append(lock)

    def release(self):
        while self.acquired:
            lock = self.acquired.pop()
            lock.release()
            self.released.append(lock)

Example:

async def test(lock1, lock2):
    async with multilock(lock1, lock2):
        print('Do something')

Solution 2:

Based on this solution I created solution for this example. We need two things:

  1. add locked() function to Sheep and Gate, that's checking if object can be acquired right now

  2. add and use new MultiAcquire task that would acquire objects only if it all can be acquired right now (and suspend for release event otherwise)

Here's final code, see MultiAcquire - it's main:

import asyncio


class Sheep:
    _sem = asyncio.Semaphore(2)  # we have 2 avaliable sheeps at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire sheep ({})'.format(self._reason))

    def release(self):
        print('release sheep ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class Gate:
    _sem = asyncio.Semaphore(1)  # we have 1 avaliable gate at time

    def __init__(self, reason):
        self._reason = reason

    async def acquire(self):
        await type(self)._sem.acquire()
        print('acquire gate ({})'.format(self._reason))

    def release(self):
        print('release gate ({})'.format(self._reason))
        type(self)._sem.release()

    def locked(self):
        return type(self)._sem.locked()


class MultiAcquire(asyncio.Task):
    _check_lock = asyncio.Lock()  # to suspend for creating task that acquires objects
    _release_event = asyncio.Event()  # to suspend for any object was released

    def __init__(self, locks):
        super().__init__(self._task_coro())
        self._locks = locks
        # Here we use decorator to subscribe all release() calls,
        # _release_event would be set in this case:
        for l in self._locks:
            l.release = self._notify(l.release)

    async def _task_coro(self):
        while True:
            # Create task to acquire all locks and break on success:
            async with type(self)._check_lock:
                if not any(l.locked() for l in self._locks):  # task would be created only if all objects can be acquired
                    task = asyncio.gather(*[l.acquire() for l in self._locks])  # create task to acquire all objects 
                    await asyncio.sleep(0)  # start task without waiting for it
                    break
            # Wait for any release() to try again:
            await type(self)._release_event.wait()
        # Wait for task:
        return await task

    def _notify(self, func):
        def wrapper(*args, **kwargs):
            type(self)._release_event.set()
            type(self)._release_event.clear()
            return func(*args, **kwargs)
        return wrapper


async def spend(reason):
    sheep = Sheep(reason)
    gate = Gate(reason)
    await MultiAcquire([sheep, gate])  # block 1 sheep, 1 gate
    await asyncio.sleep(1)  # 1 second
    print('Spend sheep through a gate')
    sheep.release()
    gate.release()


async def feed(reason):
    sheep = Sheep(reason)
    await MultiAcquire([sheep])  # block 1 sheep
    await asyncio.sleep(2)  # 2 seconds
    print('Feed sheep')
    sheep.release()


async def main():
    await asyncio.gather(
        spend('spend 1'),
        feed('feed 1'),
        spend('spend 2')
    )  # spend 2 times, feed 1 time


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

acquire gate (spend 2)
acquire sheep (spend 2)
acquire sheep (feed 1)
Spend sheep through a gate
release sheep (spend 2)
release gate (spend 2)
acquire sheep (spend 1)
acquire gate (spend 1)
Feed sheep
release sheep (feed 1)
Spend sheep through a gate
release sheep (spend 1)
release gate (spend 1)
[Finished in 2.2s]

Post a Comment for "Synchronous Blocking Of Multiple Resources"