Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run test from library user guide api-health, catalogs & custom-table-providers docs #14544

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ serde_json = { workspace = true }
sysinfo = "0.33.1"
test-utils = { path = "../../test-utils" }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] }
dashmap = "6.1.0"

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.29.0", features = ["fs"] }
Expand Down
42 changes: 42 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,3 +890,45 @@ doc_comment::doctest!(
"../../../docs/source/library-user-guide/using-the-dataframe-api.md",
library_user_guide_dataframe_api
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/api-health.md",
library_user_guide_api_health
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/catalogs.md",
library_user_guide_catalogs
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/custom-table-providers.md",
library_user_guide_custom_table_providers
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/extending-operators.md",
library_user_guide_extending_operators
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/extensions.md",
library_user_guide_extensions
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/index.md",
library_user_guide_index
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/library-user-guide/profiling.md",
library_user_guide_profiling
);
4 changes: 2 additions & 2 deletions docs/source/library-user-guide/api-health.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ To mark the API as deprecated, use the `#[deprecated(since = "...", note = "..."
For example:

```rust
#[deprecated(since = "41.0.0", note = "Use SessionStateBuilder")]
pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self
#[deprecated(since = "41.0.0", note = "Use new API instead")]
pub fn api_to_deprecated(a: usize, b: usize) {}
```

Deprecated methods will remain in the codebase for a period of 6 major versions or 6 months, whichever is longer, to provide users ample time to transition away from them.
Expand Down
148 changes: 130 additions & 18 deletions docs/source/library-user-guide/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ In the following example, we'll implement an in memory catalog, starting with th
The `MemorySchemaProvider` is a simple implementation of the `SchemaProvider` trait. It stores state (i.e. tables) in a `DashMap`, which then underlies the `SchemaProvider` trait.

```rust
use std::sync::Arc;
use dashmap::DashMap;
use datafusion::catalog::{TableProvider, SchemaProvider};

#[derive(Debug)]
pub struct MemorySchemaProvider {
tables: DashMap<String, Arc<dyn TableProvider>>,
}
Expand All @@ -50,6 +55,20 @@ pub struct MemorySchemaProvider {
Then we implement the `SchemaProvider` trait for `MemorySchemaProvider`.

```rust
# use std::sync::Arc;
# use dashmap::DashMap;
# use datafusion::catalog::TableProvider;
#
# #[derive(Debug)]
# pub struct MemorySchemaProvider {
# tables: DashMap<String, Arc<dyn TableProvider>>,
# }

use std::any::Any;
use datafusion::catalog::SchemaProvider;
use async_trait::async_trait;
use datafusion::common::{Result, exec_err};

#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
fn as_any(&self) -> &dyn Any {
Expand All @@ -63,8 +82,8 @@ impl SchemaProvider for MemorySchemaProvider {
.collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables.get(name).map(|table| table.value().clone())
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Ok(self.tables.get(name).map(|table| table.value().clone()))
}

fn register_table(
Expand Down Expand Up @@ -93,12 +112,85 @@ impl SchemaProvider for MemorySchemaProvider {
Without getting into a `CatalogProvider` implementation, we can create a `MemorySchemaProvider` and register `TableProvider`s with it.

```rust
# use std::sync::Arc;
# use dashmap::DashMap;
# use datafusion::catalog::TableProvider;
#
# #[derive(Debug)]
# pub struct MemorySchemaProvider {
# tables: DashMap<String, Arc<dyn TableProvider>>,
# }
#
# use std::any::Any;
# use datafusion::catalog::SchemaProvider;
# use async_trait::async_trait;
# use datafusion::common::{Result, exec_err};
#
# #[async_trait]
# impl SchemaProvider for MemorySchemaProvider {
# fn as_any(&self) -> &dyn Any {
# self
# }
#
# fn table_names(&self) -> Vec<String> {
# self.tables
# .iter()
# .map(|table| table.key().clone())
# .collect()
# }
#
# async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
# Ok(self.tables.get(name).map(|table| table.value().clone()))
# }
#
# fn register_table(
# &self,
# name: String,
# table: Arc<dyn TableProvider>,
# ) -> Result<Option<Arc<dyn TableProvider>>> {
# if self.table_exist(name.as_str()) {
# return exec_err!(
# "The table {name} already exists"
# );
# }
# Ok(self.tables.insert(name, table))
# }
#
# fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
# Ok(self.tables.remove(name).map(|(_, table)| table))
# }
#
# fn table_exist(&self, name: &str) -> bool {
# self.tables.contains_key(name)
# }
# }

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::datasource::MemTable;
use arrow::array::{self, Array, ArrayRef, Int32Array};

impl MemorySchemaProvider {
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
pub fn new() -> Self {
Self {
tables: DashMap::new(),
}
}
}

let schema_provider = Arc::new(MemorySchemaProvider::new());
let table_provider = _; // create a table provider

schema_provider.register_table("table_name".to_string(), table_provider);
let table_provider = {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from((1..=1).collect::<Vec<_>>()));
let partitions = vec![vec![RecordBatch::try_new(schema.clone(), vec![arr as ArrayRef]).unwrap()]];
Arc::new(MemTable::try_new(schema, partitions).unwrap())
};

schema_provider.register_table("users".to_string(), table_provider);

let table = schema_provider.table("table_name").unwrap();
let table = schema_provider.table("users");
```

### Asynchronous `SchemaProvider`
Expand All @@ -108,27 +200,44 @@ It's often useful to fetch metadata about which tables are in a schema, from a r
The trait is roughly the same except for the `table` method, and the addition of the `#[async_trait]` attribute.

```rust
# use async_trait::async_trait;
# use std::sync::Arc;
# use datafusion::catalog::{TableProvider, SchemaProvider};
# use datafusion::common::Result;
#
# type OriginSchema = arrow::datatypes::Schema;
#
# #[derive(Debug)]
# struct Schema(OriginSchema);

#[async_trait]
impl SchemaProvider for Schema {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
// fetch metadata from remote source
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
# todo!();
}

# fn as_any(&self) -> &(dyn std::any::Any + 'static) { todo!() }
# fn table_names(&self) -> Vec<std::string::String> { todo!() }
# fn table_exist(&self, _: &str) -> bool { todo!() }
}
```

## Implementing `MemoryCatalogProvider`

As mentioned, the `CatalogProvider` can manage the schemas in a catalog, and the `MemoryCatalogProvider` is a simple implementation of the `CatalogProvider` trait. It stores schemas in a `DashMap`.
As mentioned, the `CatalogProvider` can manage the schemas in a catalog, and the `MemoryCatalogProvider` is a simple implementation of the `CatalogProvider` trait. It stores schemas in a `DashMap`. With that the `CatalogProvider` trait can be implemented.

```rust
use std::any::Any;
use std::sync::Arc;
use dashmap::DashMap;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use datafusion::common::Result;

#[derive(Debug)]
pub struct MemoryCatalogProvider {
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
```

With that the `CatalogProvider` trait can be implemented.

```rust
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -167,20 +276,23 @@ impl CatalogProvider for MemoryCatalogProvider {
}
```

Again, this is fairly straightforward, as there's an underlying data structure to store the state, via key-value pairs.

Again, this is fairly straightforward, as there's an underlying data structure to store the state, via key-value pairs. With that the `CatalogProviderList` trait can be implemented.
## Implementing `MemoryCatalogProviderList`

```rust

use std::any::Any;
use std::sync::Arc;
use dashmap::DashMap;
use datafusion::catalog::{CatalogProviderList, CatalogProvider};
use datafusion::common::Result;

#[derive(Debug)]
pub struct MemoryCatalogProviderList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}
```

With that the `CatalogProviderList` trait can be implemented.

```rust
impl CatalogProviderList for MemoryCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
Expand Down
Loading