Design Patterns for Order Latency - Why You May Need to Implement Your Own Gateway for Trading
Most recently, I announced some coming, exciting upcoming things for quantpylib:
as well as a demontration of its powers:
Today, I want to talk about design principles when order latency is a concern. Some topics are specific to Python, and others are more general, so do read on. I want to specifically address how popular SDKs are designed and why it is not ideal, say, for low latency trading.
We will write some code. Let’s take a look at the first design: synchronous requests: the official python-hyperliquid-sdk
https://github.com/hyperliquid-dex/hyperliquid-python-sdk/blob/master/hyperliquid/exchange.py
It is clear from this snapshot that _post_action leads to something like….requests.post down the call stack. Let’s emulate that in code:
import threading
import requests
import asyncio
import aiohttp
order = {"BTC":"BUY"}
def request_limit():
res = requests.post("https://www.example.com",json=order)
res.text
async def main():
#1 worst
[request_limit() for _ in range(100)]
if __name__ == "__main__":
asyncio.run(main())
I tag this #1 worst, because each limit order needs to wait for the previous limit order to finish. Each request requires the TCP handshake, as there are no sessions kept alive between requests.
Let’s fix one issue at a time. Perhaps we don’t want orders to wait for the previous one to finish, and submit them concurrently:
#2 bad
threads=[threading.Thread(target=request_limit, args=()) for _ in range(100)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
This is significantly better than #1, but this is resource-wasteful, and slower than it could be since they are not as lightweight as coroutines. Each request still performs the TCP handshake:
async def arequest_limit():
async with aiohttp.ClientSession() as session:
async with session.post("https://www.example.com",json=order) as res:
await res.text()
await asyncio.gather(*[arequest_limit() for _ in range(5)])
We can fix the resource waste using more lightweight coroutines, by using aysnchronous requests through aiohttp client sessions. But since each request is isolated within its own context manager, the session is not kept alive.
We can set a session object as the exchange client object attribute:
class ExchangeClient():
def __init__(self):
self.session = aiohttp.ClientSession()
async def arequest_limit(self):
async with self.session.post("https://www.example.com",json=order) as res:
await res.text()
client = ExchangeClient()
await asyncio.gather(*[client.arequest_limit() for _ in range(5)])
This is actually not bad, but the session might have terminated either client or server side unbeknownst to us, and if we don’t clean it up, things can get messy:
async def close_session(self):
await self.session.close()
and something like
await client.close_session()
is required on every network request error handling, but in SDK’s, this is often not implemented. See the popular library, python-binance asynchronous client implementation:
Often I have had troublesome error handling logic because of this.
A session is created on object init, then on request this session is used. This does not have an error handler, and leaves it to the caller to close or renew sessions, which could be beneficial for flexibility, but unsafe. In the positive case though, the TCP handshake is done once and then shared across requests to binance server.
We are going to propose a different design pattern to maintain the asynchronous context manager, while maximising throughput based on the load permitted on the exchange server. First, our architecture would use message queues and futures as notification systems.
A limit order function is written to share the same code interface as any other, normal request as you would in CCXT or python-binance:
limit_order_queue = asyncio.Queue()
async def limit_order():
fut = asyncio.Future()
await limit_order_queue.put((fut,order))
print(await fut)
This abstracts the implementation. Internally, however, it puts the order specs on an asynchronous message queue. It waits for the result of the response using a future.
At application startup, an asynchronous task is put on the event loop to listen for any order item on the queue and immediately submits them:
async def submit_limit():
global limit_order_queue
async with aiohttp.ClientSession() as session:
while True:
fut,_order=await limit_order_queue.get()
async with session.post(
"https://www.example.com",json=_order
) as res:
fut.set_result(await res.text())
The result of the response is set on the future to return it to the limit order function context. But why is this not great?
#4 not good
task = asyncio.create_task(submit_limit())
await asyncio.gather(*[limit_order() for _ in range(100)])
while True:
#stream tick and trade
await asyncio.sleep(100)
A single session is used, the TCP handshake is shared, but this
async with session.post(...) as res
await res.text()
has to do a network trip before it enters into the next iteration of the while loop. Orders are put on the queue faster than this round trip, hence latency. We can easily customise the speed subject to exchange limitations by having workers tasks competing for the same message queue:
num_workers = 30
[asyncio.create_task(submit_limit()) for _ in range(num_workers)]
await asyncio.gather(*[limit_order() for _ in range(100)])
await asyncio.sleep(100)
This way, we maintain the context manager and cleanup while having fast throughput. Each worker task does its own handshake and shares with any subsequent requests.
Of course, one should be careful with asynchronous tasks:
Have fun~
Code: