Skip to content

Commit

Permalink
Replace BallistaContext with SessionContext (#1088)
Browse files Browse the repository at this point in the history
* Initial SessionContextExt skeleton

relates to #1081

* add few more tests ...

to find missing functionalities, and verify it
`SessionContextExt` will not fail any of the tests
for `BallistaContext`

* Detect if LogicalPlan is scanning information schema

... it does, we will use `DefaultPhysicalPlanner`
and execute query locally.

* change extension interface, simplifying it

* Change SessionContextExt interface ...

... add more tests

* update rustdocs

* remote methods accept `url` ...

... it would be easier to add security later.

* remove config option for now ...

... would add them in next commits, once i get
better idea about them.

* debug failed windows test

* remove `standalone` from default features in client

* fix clippy in tests

* fix formatting as well
  • Loading branch information
milenkovicm authored Oct 21, 2024
1 parent 8bbd998 commit 92ce301
Show file tree
Hide file tree
Showing 15 changed files with 1,779 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ filtered_rat.txt
arrow-src.tar
arrow-src.tar.gz
CHANGELOG.md.bak
Cargo.toml.bak

# Compiled source
*.a
Expand Down
6 changes: 6 additions & 0 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ edition = "2021"
rust-version = "1.72"

[dependencies]
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "0.12.0" }
ballista-executor = { path = "../executor", version = "0.12.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional = true }
Expand All @@ -39,6 +40,11 @@ parking_lot = { workspace = true }
sqlparser = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
url = { version = "2.5" }

[dev-dependencies]
ctor = { version = "0.2" }
env_logger = { workspace = true }

[features]
azure = ["ballista-core/azure"]
Expand Down
1 change: 1 addition & 0 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl BallistaContextState {
}
}

// #[deprecated]
pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
context: Arc<SessionContext>,
Expand Down
203 changes: 203 additions & 0 deletions ballista/client/src/extension.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista_core::{
config::BallistaConfig,
serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, CreateSessionParams, KeyValuePair,
},
utils::{create_df_ctx_with_ballista_query_planner, create_grpc_client_connection},
};
use datafusion::{error::DataFusionError, prelude::SessionContext};
use datafusion_proto::protobuf::LogicalPlanNode;
use url::Url;

const DEFAULT_SCHEDULER_PORT: u16 = 50050;

/// Module provides [SessionContextExt] which adds `standalone*` and `remote*`
/// methods to [SessionContext].
///
/// Provided methods set up [SessionContext] with [BallistaQueryPlanner](ballista_core::utils), which
/// handles running plans on Ballista clusters.
///
///```no_run
/// use ballista::prelude::SessionContextExt;
/// use datafusion::prelude::SessionContext;
///
/// # #[tokio::main]
/// # async fn main() -> datafusion::error::Result<()> {
/// let ctx: SessionContext = SessionContext::remote("df://localhost:50050").await?;
/// # Ok(())
/// # }
///```
///
/// [SessionContextExt::standalone()] provides an easy way to start up
/// local cluster. It is an optional feature which should be enabled
/// with `standalone`
///
///```no_run
/// use ballista::prelude::SessionContextExt;
/// use datafusion::prelude::SessionContext;
///
/// # #[tokio::main]
/// # async fn main() -> datafusion::error::Result<()> {
/// let ctx: SessionContext = SessionContext::standalone().await?;
/// # Ok(())
/// # }
///```
///
/// There are still few limitations on query distribution, thus not all
/// [SessionContext] functionalities are supported.
///
#[async_trait::async_trait]
pub trait SessionContextExt {
/// Create a context for executing queries against a standalone Ballista scheduler instance
/// It wills start local ballista cluster with scheduler and executor.
#[cfg(feature = "standalone")]
async fn standalone() -> datafusion::error::Result<SessionContext>;

/// Create a context for executing queries against a remote Ballista scheduler instance
async fn remote(url: &str) -> datafusion::error::Result<SessionContext>;
}

#[async_trait::async_trait]
impl SessionContextExt for SessionContext {
async fn remote(url: &str) -> datafusion::error::Result<SessionContext> {
let url =
Url::parse(url).map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let host = url.host().ok_or(DataFusionError::Configuration(
"hostname should be provided".to_string(),
))?;
let port = url.port().unwrap_or(DEFAULT_SCHEDULER_PORT);
let scheduler_url = format!("http://{}:{}", &host, port);
log::info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;

let config = BallistaConfig::builder()
.build()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let limit = config.default_grpc_client_max_message_size();
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(limit)
.max_decoding_message_size(limit);

let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
&config,
)
};

Ok(ctx)
}

#[cfg(feature = "standalone")]
async fn standalone() -> datafusion::error::Result<Self> {
use ballista_core::serde::BallistaCodec;
use datafusion_proto::protobuf::PhysicalPlanNode;

log::info!("Running in local mode. Scheduler will be run in-proc");

let addr = ballista_scheduler::standalone::new_standalone_scheduler()
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

let scheduler_url = format!("http://localhost:{}", addr.port());
let mut scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to in-proc scheduler...");
}
Ok(scheduler) => break scheduler,
}
};
let config = BallistaConfig::builder()
.build()
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;
let remote_session_id = scheduler
.create_session(CreateSessionParams {
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

log::info!(
"Server side SessionContext created with session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
&config,
)
};

let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();

let concurrent_tasks = config.default_standalone_parallelism();
ballista_executor::new_standalone_executor(
scheduler,
concurrent_tasks,
default_codec,
)
.await
.map_err(|e| DataFusionError::Configuration(e.to_string()))?;

Ok(ctx)
}
}
1 change: 1 addition & 0 deletions ballista/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
#![doc = include_str!("../README.md")]

pub mod context;
pub mod extension;
pub mod prelude;
3 changes: 2 additions & 1 deletion ballista/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ pub use ballista_core::{
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_GRPC_CLIENT_MAX_MESSAGE_SIZE,
BALLISTA_JOB_NAME, BALLISTA_PARQUET_PRUNING, BALLISTA_REPARTITION_AGGREGATIONS,
BALLISTA_REPARTITION_JOINS, BALLISTA_REPARTITION_WINDOWS,
BALLISTA_WITH_INFORMATION_SCHEMA,
BALLISTA_STANDALONE_PARALLELISM, BALLISTA_WITH_INFORMATION_SCHEMA,
},
error::{BallistaError, Result},
};

pub use futures::StreamExt;

pub use crate::context::BallistaContext;
pub use crate::extension::SessionContextExt;
Loading

0 comments on commit 92ce301

Please sign in to comment.