Quoting Data Collection
Data Collection Architecture Proposals in quoting
This document outlines the architecture for data collection within the quoting
project. In this framework, DataCollector
acts as an orchestrator, similar to the role of Feeder
, managing the lifecycles of various data sources. On the other hand, DataSourceCollector
functions similarly to PriceFeed
, designed for each type of data, such as trade data using CEXClient
, liquidity data, and so on.
New Components
DataCollectionStorage
DataCollectionStorage
serves as the persistence layer for data. It provides an abstract way to store time-series data from various data sources. It is designed to be storage-agnostic, with an initial implementation using Redis as the backend.
Interface
The DataCollectionStorage
abstract base class provides an interface for all storage implementations. It defines methods to work with time-series data.
For example:
class DataCollectionStorage(ABC):
@abstractmethod
def store(self, key: str, value: Any, timestamp: int) -> None:
"""Stores a data point associated with a key and timestamp."""
pass
@abstractmethod
def fetch(self, key: str, start_time: int, end_time: int) -> List[Tuple[int, Any]]:
"""Fetches a series of data points based on a key and a time range."""
pass
Redis Implementation
In the Redis implementation, the time-series data would be stored using Redis TimeSeries, which provides efficient storage and querying for time-based data.
Modules to Add
For example:
quoting
└── storage
├── __init__.py
├── base.py
└── redis_storage.py
DataSourceCollector
The DataSourceCollector
is an interface that abstracts the logic for collecting data from various sources, managing its own lifecycle and data processing. It is useful when dealing with different data sources that have varying connection methods and update frequencies. It is used with a DataCollectionStorage
backend to which it passes the collected data.
Interface
The interface includes a single method, run(storage: DataCollectionStorage)
, which is asynchronous to allow for flexible data collection and processing. This universal method can operate in various modes—either running in an infinite loop to listen to a WebSocket or performing periodic polling via HTTP.
For example:
class DataSourceCollector(ABC):
@abstractmethod
async def run(self, storage: DataCollectionStorage) -> None:
"""
Starts the process of collecting and processing data.
This method can run in an infinite loop, polling the data source or listening to a WebSocket.
"""
pass
Implementation Details
The DataSourceCollector
interface allows for various implementations depending on the data source and the connection type.
For example:
- Implementations leveraging
CEXClient.get_trades()
for CEX trade data similar to the existing CEX Feeds. - Custom liquidity monitoring solutions that may operate similarly to existing DEX Feeds.
- Potential implementations utilizing WebSockets for more efficient data collection.
These implementations are expected to be modular and easy to plug into the DataCollector
orchestration layer.
Modules to Add
For example:
quoting
└── data_collectors
├── __init__.py
├── base.py
├── trades_data_collector.py
├── liquidity_data_collector.py
└── ...
DataCollector Interaction
The DataCollector
class can simply call the run()
method for each DataSourceCollector
instance. Each DataSourceCollector
operates independently by polling its data source or listening to a WebSocket, and it sends the data to the designated storage.
DataCollector
The DataCollector
is similar to a Feeder
in that it serves as an orchestrator for multiple DataSourceCollector
instances. It is responsible for:
-
Managing Life Cycles: The
DataCollector
invokes therun()
method of eachDataSourceCollector
in parallel. -
Error Handling: The
DataCollector
catches exceptions and logs them. It also restarts theDataSourceCollector
if it crashes.
class DataCollector:
def __init__(self, collectors: List[DataSourceCollector], storage: DataCollectionStorage):
self.collectors = collectors
self.storage = storage
async def run(self):
"""Starts the process of collecting and processing data."""
# ...
Module to Add
For example:
quoting
└── data_collector.py
load_data_source_collectors()
It is needed to have a function that is similar to load_feeds()
to load DataSourceCollector
instances from a configuration file. Likely, the configuration should not be extended with additional fields. Instead, the existing feeds
configuration should be utilized, as it specifically defines which data needs to be collected. For example, this could be achieved by adding a method to PriceFeed
to retrieve the necessary DataSourceCollector
instances.
How Can It Be Used
The DataCollector
can function as an independent daemon, concentrating on accumulating the required data in isolation from other processes, especially the Updater
. But this allows the Updater to easily utilize the aggregated data by using modified Feeds that can access the DataCollectionStorage
.