Skip to content

Ingest Source Research

Antoni Ivanov edited this page Oct 15, 2023 · 23 revisions

The purpose of the page is to specify the requirements for a Data Source Ingestion in VDK, outlining the functionalities needed to serve the needs of Plugin Developers, Data Users, and Operators. It serves as a guide for generating ideas, for development.

Personas

  • Extension (Plugin) Developer . The developers building different data source plugins
  • (Data) User - the user (data practitioner, engineer, scientist) using the plugins to create their own datasets
  • Operator - this ingest would run in some infrastructure and need to be operated by IT or data teams or DevOps teams. It would need to scale and be robust and stable.

Requirements

Data Source Extension API

Target Persona: Plugin Developer

As a plugin developer, I want to be able to build a plugin for a new type of data source.

Connection Management

The API should allow to handle connectivity to data sources in a secure, efficient way with minimum user intervention. The API should be able to handle a variety of data sources extensions such as relational databases, NoSQL databases, file systems, cloud storages, web APIs, and so on.

Filtering

The API should allow the data users to specify criteria to extract only certain data, such as a specific date range or where certain conditions are met. So the Plugin Developer should have a way to accept those and handle them in the plugin

Incremental Extraction

To avoid extracting all data every time, the API should support incremental extraction, often based on a timestamp or a specific ID field. This can be done by offering parameters to specify the field and the last value extracted to the user.

The point is though the API should provide abstractions for plugin developer to handle this as well.

Parsing

Diverse Data Format Support

The API should support extraction in various data formats (JSON, CSV, XML, etc.).

Considering that we can have multiple data sources that are pared the same - E.g Twitter Source, Salesforce source, and many more web apps as sources would likely be json. So this should be decoupled.

Data Source User API

Target Persona: Data user

As a data user who needs to ingest (extract) data from differnet soruces I want ot she build plugins for those sources to make my job for extracting and ingesting the data very easy (just configuration)

Transformation

Re-mapping and other transformation

The API should provide exntesion api for user to apply simple transformaiton but only if needed. It should be optional and there should be default mapping.

Metadata and Metrics

There should be Metric API that allow extension to define source-associated standard metrics and calculates these metrics for the corresponding data source

Metadata Extraction

Along with the actual data, users need access to metadata (like table schemas, attributes/columns, file sizes, etc.). Those stats could be very valuable for quality and data cleaning and troubleshooting

Error Handling

The API should be able to gracefully handle errors and provide informative error messages. This could be achieved using HTTP status codes and error messages in the API response. It's also critical to handle failures gracefully, with capabilities for error detection, retrying, and recovery.

Operation

Target Persona: Operator

Parallelism

The data to be extracted can be divided into logical partitions that can be processed independently, such as different tables in a database, different time ranges of data, or different geographical regions. The API could include parameters to specify the partitioning scheme for a given extraction job

Rate limitting and back pressure

Parallel ingestion can place a lot of load on both the source and target systems, so it's important to also include monitoring and managing system performance, such as rate limiting and backpressure mechanisms

Operability and troubleshoot ability

Monitoring and Logging

API should expose essential metrics and logs for real-time and post-analysis.

Audit Trails

API should keep logs for activities like data extraction, errors, and access records.

Alerting Mechanisms

Facilitate real-time alerts for high error rates or resource overuse.

Rollback and Versioning

Support for rollbacks and versioning for data and configurations should be present.

Self-Healing

Auto-recovery for transient errors should be possible to be built in.

Documentation

Provide concise operator-specific documentation for troubleshooting and best practices.

Ideas

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Iterable

class DataSource(ABC):
    """
    Abstract base class for a data source.
    """

    @abstractmethod
    def connect(self, config: Dict[str, Any]) -> None:
        """
        Establishes a connection to the data source.

        :param config: The connection configuration.
        """
        pass

    @abstractmethod
    def disconnect(self) -> None:
        """
        Disconnects from the data source.
        """
        pass

    @abstractmethod
    def read(self, partitions: List[Any], start_key: Any = None) -> Iterable[Dict[str, Any]]:
        """
        Reads data from the data source.

        :param partitions: The partitions to read from.
        :param start_key: The key from which to start reading.
        :return: The data read from the data source.
        """
        pass

    @abstractmethod
    def split(self, num_partitions: int) -> List[Any]:
        """
        Splits the data source into partitions.

        :param num_partitions: The number of partitions to split the data source into.
        :return: A list of partition identifiers.
        """
        pass

    @abstractmethod
    def get_incremental_key(self) -> Any:
        """
        Returns the key for incremental ingestion.

        :return: The key for incremental ingestion.
        """
        pass

References

Clone this wiki locally