Skip to main content

Quoting Data Collection

Data Collection Architecture Proposals in quoting

This document outlines the architecture for data collection within the quoting project. In this framework, DataCollector acts as an orchestrator, similar to the role of Feeder, managing the lifecycles of various data sources. On the other hand, DataSourceCollector functions similarly to PriceFeed, designed for each type of data, such as trade data using CEXClient, liquidity data, and so on.

New Components

DataCollectionStorage

DataCollectionStorage serves as the persistence layer for data. It provides an abstract way to store time-series data from various data sources. It is designed to be storage-agnostic, with an initial implementation using Redis as the backend.

Interface

The DataCollectionStorage abstract base class provides an interface for all storage implementations. It defines methods to work with time-series data.

For example:

class DataCollectionStorage(ABC):

@abstractmethod
def store(self, key: str, value: Any, timestamp: int) -> None:
"""Stores a data point associated with a key and timestamp."""
pass

@abstractmethod
def fetch(self, key: str, start_time: int, end_time: int) -> List[Tuple[int, Any]]:
"""Fetches a series of data points based on a key and a time range."""
pass

Redis Implementation

In the Redis implementation, the time-series data would be stored using Redis TimeSeries, which provides efficient storage and querying for time-based data.

Modules to Add

For example:

quoting
└── storage
├── __init__.py
├── base.py
└── redis_storage.py

DataSourceCollector

The DataSourceCollector is an interface that abstracts the logic for collecting data from various sources, managing its own lifecycle and data processing. It is useful when dealing with different data sources that have varying connection methods and update frequencies. It is used with a DataCollectionStorage backend to which it passes the collected data.

Interface

The interface includes a single method, run(storage: DataCollectionStorage), which is asynchronous to allow for flexible data collection and processing. This universal method can operate in various modes—either running in an infinite loop to listen to a WebSocket or performing periodic polling via HTTP.

For example:

class DataSourceCollector(ABC):

@abstractmethod
async def run(self, storage: DataCollectionStorage) -> None:
"""
Starts the process of collecting and processing data.
This method can run in an infinite loop, polling the data source or listening to a WebSocket.
"""
pass

Implementation Details

The DataSourceCollector interface allows for various implementations depending on the data source and the connection type. For example:

  1. Implementations leveraging CEXClient.get_trades() for CEX trade data similar to the existing CEX Feeds.
  2. Custom liquidity monitoring solutions that may operate similarly to existing DEX Feeds.
  3. Potential implementations utilizing WebSockets for more efficient data collection.

These implementations are expected to be modular and easy to plug into the DataCollector orchestration layer.

Modules to Add

For example:

quoting
└── data_collectors
├── __init__.py
├── base.py
├── trades_data_collector.py
├── liquidity_data_collector.py
└── ...

DataCollector Interaction

The DataCollector class can simply call the run() method for each DataSourceCollector instance. Each DataSourceCollector operates independently by polling its data source or listening to a WebSocket, and it sends the data to the designated storage.

DataCollector

The DataCollector is similar to a Feeder in that it serves as an orchestrator for multiple DataSourceCollector instances. It is responsible for:

  1. Managing Life Cycles: The DataCollector invokes the run() method of each DataSourceCollector in parallel.

  2. Error Handling: The DataCollector catches exceptions and logs them. It also restarts the DataSourceCollector if it crashes.

class DataCollector:

def __init__(self, collectors: List[DataSourceCollector], storage: DataCollectionStorage):
self.collectors = collectors
self.storage = storage

async def run(self):
"""Starts the process of collecting and processing data."""
# ...

Module to Add

For example:

quoting
└── data_collector.py

load_data_source_collectors()

It is needed to have a function that is similar to load_feeds() to load DataSourceCollector instances from a configuration file. Likely, the configuration should not be extended with additional fields. Instead, the existing feeds configuration should be utilized, as it specifically defines which data needs to be collected. For example, this could be achieved by adding a method to PriceFeed to retrieve the necessary DataSourceCollector instances.

How Can It Be Used

The DataCollector can function as an independent daemon, concentrating on accumulating the required data in isolation from other processes, especially the Updater. But this allows the Updater to easily utilize the aggregated data by using modified Feeds that can access the DataCollectionStorage.