Asynchronous Tasks in Trading Applications; Mind the Gap
There’s a current offer for our subscriptions:
Now, let’s talk async-Python. As we talked about, asynchronous programs are the key to resource-efficient concurrency in Python applications that require heavy I/O, such as trading applications. Anyone working with IBKR APIs would have come across asynchronous contexts surely.
Our site is dedicated to improving your quant-dev skills. Since async is such an integral part of the Python trading-application, today we shall talk about it.
One of the pitfalls/challenges of concurrent programming is error-handling: many things going on at one time, and many points of failure. This is worst when we have multi-core parallelism - the system needs to be fault tolerant in relation to multiple processes, which contain different memory space - and may require IPC (Inter Process Communication) to synchronise activity. This is somewhat easier with threads, since threads share the same memory space, so we require the use of synchronisation primitives around the critical regions modifying shared variables. Even easier is the asynchronous paradigm; it is single threaded, of course they share the memory space - but yet there are complications.
One of the challenges is in understanding when the error is thrown up the call stack. Today, we shall just give a simple but common antipattern and how to work around it.
We start off our main program and enter into the asynchronous context, do some setup, streaming of data….and once we have enough market data we may start performing computation and submit orders to exchange.
Perhaps something like this:
import asyncio
async def stream_l2_book():
i = 10
while i > 0:
print("streamed data")
i -= 1
await asyncio.sleep(0.5)
raise Exception("websocket error")
async def model_book_and_trade():
while True:
print("adjusting positions")
await asyncio.sleep(0.5)
async def main():
task = asyncio.create_task(stream_l2_book())
await model_book_and_trade()
We put the stream_l2_book
coroutine inside a task, which we want to loop infinitely and retrieve data from the market, say from a HTTP/websocket protocol. We may store the data in some buffer, so that we can act on this buffer. Since this loops infinitely, but we only want it to run in the `background`, while we go ahead and submit trades, we will wrap this in a task, and schedule this on the event loop. This returns immediately, and we can go ahead and call other coroutines…the task runs in the background.
Unfortunately, network connections, including socket streams are often unstable. And for a variety of reasons, your data stream might run into errors. The remote server could crash, terminate session, close socket stream or run out of heap space…et cetera. This is represented by the raise Exception line.
Since we are no longer streaming the l2 book, we want to stop submitting trades. But what happens instead?
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
adjusting positions
adjusting positions
adjusting positions
adjusting positions
adjusting positions
adjusting positions ... continues submitting trades on stale l2 buffer
Only when we do a CTRL-C, we get the following stack trace:
^CTraceback (most recent call last):
File "/usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/test.py", line 18, in main
await model_book_and_trade()
File "/test.py", line 14, in model_book_and_trade
await asyncio.sleep(0.5)
File "/usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/tasks.py", line 639, in sleep
return await future
^^^^^^^^^^^^
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/test.py", line 22, in <module>
asyncio.run(main())
File "/usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/python@3.11/3.11.4_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 123, in run
raise KeyboardInterrupt()
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<stream_l2_book() done, defined at /test.py:3> exception=Exception('websocket error')>
Traceback (most recent call last):
File "/test.py", line 9, in stream_l2_book
raise Exception("websocket error")
Exception: websocket error
On a not so funny note, as I was programming up my funding arbitrage bot over the last few weeks, this is an actual error I made, causing me to submit trades on old quotes.
The inherent concern is this: the exception is only retrieved when the coroutine/task is awaited. But the data stream coroutine is designed to loop infinitely, so at no point before the trading function, you would call await task since any subsequent lines of code would not be able to run.
First of all, you always want to wrap sensitive operations around checks. Especially so if the tasks where generated within a local function context, you want to keep a pointer around to the task so that you can check on its validity. For instance, we could do:
import asyncio
tasks = {}
async def stream_l2_book():
try:
i = 10
while i > 0:
print("streamed data")
i -= 1
await asyncio.sleep(0.5)
raise Exception("websocket error")
except Exception as exc:
tasks['l2_stream'] = exc
raise exc
async def model_book_and_trade():
while True:
if not isinstance(tasks["l2_stream"],Exception):
print("adjusting positions")
else:
print("l2 quotes are stale")
return
await asyncio.sleep(0.5)
async def main():
task = asyncio.create_task(stream_l2_book())
tasks['l2_stream'] = task
await model_book_and_trade()
try:
await task
except Exception:
print("shutdown")
if __name__ == "__main__":
asyncio.run(main())
We keep track of the tasks inside a dictionary, and also in the main context. Inside the sensitive trade method, we want to verify that our stream is active, otherwise we will cede trading.
We can also choose to await task here in the main to retrieve the Exception. This is our adjusted output:
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
streamed data
adjusting positions
l2 quotes are stale
shutdown
Hope that was a good learn!
Cheers.