Create Two Concurrently Async Task With Asyncio
Solution 1:
You are not using the ThreadPoolExecutor
correctly, and you really don't want to use that here. Instead, you need to set up consumers and producers to handle your socket and pipe with queues to send messages between them.
for each connection type, create a coroutine that creates the connection, then passes that single connection to both a consumer and producer tasks (created with
asyncio.create_task()
) for that connection. Useasyncio.wait()
to run both tasks withreturn_when=asyncio.FIRST_COMPLETED
, so you can cancel any that are still running when one of the two completes 'early' (e.g. has failed).Use a queue to pass messages from the consumer of one, to the producer of the other connection.
sys.stdin
andsys.stdout
are blocking streams, don't just read and write to them! See https://gist.github.com/nathan-hoad/8966377 for a gist attempting to set up non-blocking STDIO streams, and this asyncio issue that asks for a non-blocking streams feature.Don't use a global socket connection, certainly not with two separate
async with
statements. Yoursend_to_socket()
method would actually close the socket because theasync with connection as web_socket:
context manager exits when the first message is sent, and this then causes issues for thesocket_receiver
code which assumes the socket remains open indefinitely.Don't use threading here! Your connections are entirely managed by asyncio, threading would stomp majorly on this.
asyncio.Executor()
instances should only be used with regular callables, not with coroutines.Executor.submit()
states it takes a callable, passing in a coroutine withexecutor.submit(send_to_pipe(message))
orexecutor.submit(send_to_socket(message))
will cause an exception to be raised as coroutines are not callables. You are probably not seeing an exception message as that exception is raised in the other thread.This is the reason your
socket_receiver()
coroutine fails; it certainly starts but attempts to send messages fail. When I run your code against a local mocked-up websocket server a warning is printed:RuntimeWarning: coroutine 'send_to_socket' was never awaited executor.submit(send_to_socket(message))
When a coroutine is not awaited, the code in that coroutine is never executed. Wrapping the coroutine in one that prints out the exception to stderr (
try: callable(), except Exception: traceback.print_exc(file=sys.stderr))
) you get:Traceback (most recent call last): File "soq52219672.py", line 15, in log_exception callable() TypeError: 'coroutine'object is not callable
Executors should only be used to integrate code that can't be converted to using coroutines; the executor manages that code to run parallel to the asyncio
tasks without interference. Care should be taken if that code wanted to interact with asyncio
tasks, always use asyncio.run_coroutine_threadsafe()
or asyncio.call_soon_threadsafe()
to call across the boundary. See the Concurrency and multithreading section.
Here is an example of how I'd rewrite your code to use the consumer/producer pattern, with stdio()
based on the Nathan Hoad gist on the subject, plus a fallback for Windows where support for treating stdio as pipes is limited:
import asyncio
import json
import os
import sys
import websockets
asyncdefsocket_consumer(socket, outgoing):
# take messages from the web socket and push them into the queueasyncfor message in socket:
await outgoing.put(message)
asyncdefsocket_producer(socket, incoming):
# take messages from the queue and send them to the socketwhileTrue:
message = await incoming.get()
jsonmessage = json.dumps(message)
await socket.send(jsonmessage)
asyncdefconnect_socket(incoming, outgoing):
header = {"Authorization": r"Basic XXXX="}
uri = 'wss://XXXXXXXX'asyncwith websockets.connect(uri, extra_headers=header) as websocket:
# create tasks for the consumer and producer. The asyncio loop will# manage these independently
consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))
producer_task = asyncio.create_task(socket_producer(websocket, incoming))
# start both tasks, but have the loop return to us when one of them# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raisedfor task in done:
task.result()
# pipe supportasyncdefstdio(loop=None):
if loop isNone:
loop = asyncio.get_running_loop()
if sys.platform == 'win32':
# no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832# use an executor to read from stdio and write to stdoutclassWin32StdinReader:
def__init__(self):
self.stdin = sys.stdin.buffer
asyncdefreadline():
# a single call to sys.stdin.readline() is thread-safereturnawait loop.run_in_executor(None, self.stdin.readline)
classWin32StdoutWriter:
def__init__(self):
self.buffer = []
self.stdout = sys.stdout.buffer
defwrite(self, data):
self.buffer.append(data)
asyncdefdrain(self):
data, self.buffer = self.buffer, []
# a single call to sys.stdout.writelines() is thread-safereturnawait loop.run_in_executor(None, sys.stdout.writelines, data)
return Win32StdinReader(), Win32StdoutWriter()
reader = asyncio.StreamReader()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader),
sys.stdin
)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(sys.stdout.fileno(), 'wb')
)
writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)
return reader, writer
asyncdefpipe_consumer(pipereader, outgoing):
# take messages from the pipe and push them into the queuewhileTrue:
message = await pipereader.readline()
ifnot message:
breakawait outgoing.put(message.decode('utf8'))
asyncdefpipe_producer(pipewriter, incoming):
# take messages from the queue and send them to the pipewhileTrue:
jsonmessage = await incoming.get()
message = json.loads(jsonmessage)
type = int(message.get('header', {}).get('messageID', -1))
# 1 is DENM message, 2 is CAM messageiftypein {1, 2}:
pipewriter.write(jsonmessage.encode('utf8') + b'\n')
await pipewriter.drain()
asyncdefconnect_pipe(incoming, outgoing):
reader, writer = await stdio()
# create tasks for the consumer and producer. The asyncio loop will# manage these independently
consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))
producer_task = asyncio.create_task(pipe_producer(writer, incoming))
# start both tasks, but have the loop return to us when one of them# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raisedfor task in done:
task.result()
asyncdefmain():
pipe_to_socket = asyncio.Queue()
socket_to_pipe = asyncio.Queue()
socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)
pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)
await asyncio.gather(socket_coro, pipe_coro)
if __name__ == '__main__':
asyncio.run(main())
This then starts with two tasks, one to manage the socket, the other to manage the STDIO pipe. Both each start 2 more tasks, for their consumer and producer. There are two queues to send the messages from the consumer of one and to the producer of the other.
Post a Comment for "Create Two Concurrently Async Task With Asyncio"