diff --git a/CHANGELOG.md b/CHANGELOG.md index 43d957c3..1d3e8c74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ All notable changes to this project will be documented in this file. - Cpu and memory limits are now configurable ([#298]). - Stale resources are now deleted ([#310]). - Support Druid 24.0.0 ([#317]). +- Added `segmentCacheSizeGb` to make segment cache (the max size of `druid.segmentCache.locations`) configurable ([#320]) +- Refactor role configuration with per role structs like `BrokerConfig`, `HistoricalConfig`, etc ([#320]) +- Added `HistoricalStorge` and `DruidStorage` (as catch-all storage configuration) ([#320]) ### Changed @@ -17,6 +20,7 @@ All notable changes to this project will be documented in this file. [#298]: https://github.com/stackabletech/druid-operator/pull/298 [#310]: https://github.com/stackabletech/druid-operator/pull/310 [#317]: https://github.com/stackabletech/druid-operator/pull/317 +[#320]: https://github.com/stackabletech/druid-operator/pull/320 ## [0.7.0] - 2022-09-06 diff --git a/Cargo.lock b/Cargo.lock index 5db211a1..c0e5101b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -598,6 +598,12 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.25" @@ -1530,6 +1536,40 @@ dependencies = [ "winapi", ] +[[package]] +name = "rstest" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9c9dc66cc29792b663ffb5269be669f1613664e69ad56441fdb895c2347b930" +dependencies = [ + "futures 0.3.25", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5015e68a0685a95ade3eee617ff7101ab6a3fc689203101ca16ebc16f2b89c66" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.9" @@ -1777,6 +1817,8 @@ dependencies = [ name = "stackable-druid-crd" version = "0.8.0-nightly" dependencies = [ + "lazy_static", + "rstest", "semver", "serde", "serde_json", @@ -1798,6 +1840,7 @@ dependencies = [ "futures 0.3.25", "indoc", "pin-project", + "rstest", "semver", "serde", "serde_json", diff --git a/deploy/config-spec/properties.yaml b/deploy/config-spec/properties.yaml index 7b2df0b2..dc4baf24 100644 --- a/deploy/config-spec/properties.yaml +++ b/deploy/config-spec/properties.yaml @@ -722,7 +722,7 @@ properties: type: "string" defaultValues: - fromVersion: "0.0.0" - value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"300g\"}]" + value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"900m\"}]" roles: - name: "broker" required: false diff --git a/deploy/crd/druidcluster.crd.yaml b/deploy/crd/druidcluster.crd.yaml index 1ec981d8..b8b80195 100644 --- a/deploy/crd/druidcluster.crd.yaml +++ b/deploy/crd/druidcluster.crd.yaml @@ -152,6 +152,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -298,6 +299,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -484,6 +486,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -630,6 +633,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -959,6 +963,12 @@ spec: type: object type: object storage: + properties: + segmentCacheSizeGb: + format: uint16 + minimum: 0.0 + nullable: true + type: integer type: object type: object type: object @@ -1105,6 +1115,12 @@ spec: type: object type: object storage: + properties: + segmentCacheSizeGb: + format: uint16 + minimum: 0.0 + nullable: true + type: integer type: object type: object type: object @@ -1422,6 +1438,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -1568,6 +1585,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -1765,6 +1783,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -1911,6 +1930,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object diff --git a/deploy/helm/druid-operator/configs/properties.yaml b/deploy/helm/druid-operator/configs/properties.yaml index 7b2df0b2..dc4baf24 100644 --- a/deploy/helm/druid-operator/configs/properties.yaml +++ b/deploy/helm/druid-operator/configs/properties.yaml @@ -722,7 +722,7 @@ properties: type: "string" defaultValues: - fromVersion: "0.0.0" - value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"300g\"}]" + value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"900m\"}]" roles: - name: "broker" required: false diff --git a/deploy/helm/druid-operator/crds/crds.yaml b/deploy/helm/druid-operator/crds/crds.yaml index 09fe27c6..a1e2ecd2 100644 --- a/deploy/helm/druid-operator/crds/crds.yaml +++ b/deploy/helm/druid-operator/crds/crds.yaml @@ -61,6 +61,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -114,6 +115,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -207,6 +209,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -260,6 +263,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -496,6 +500,12 @@ spec: type: object type: object storage: + properties: + segmentCacheSizeGb: + format: uint16 + minimum: 0.0 + nullable: true + type: integer type: object type: object type: object @@ -549,6 +559,12 @@ spec: type: object type: object storage: + properties: + segmentCacheSizeGb: + format: uint16 + minimum: 0.0 + nullable: true + type: integer type: object type: object type: object @@ -773,6 +789,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -826,6 +843,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -930,6 +948,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -983,6 +1002,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object diff --git a/deploy/manifests/configmap.yaml b/deploy/manifests/configmap.yaml index 8927d6e5..2ee1ea2c 100644 --- a/deploy/manifests/configmap.yaml +++ b/deploy/manifests/configmap.yaml @@ -726,7 +726,7 @@ data: type: "string" defaultValues: - fromVersion: "0.0.0" - value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"300g\"}]" + value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"900m\"}]" roles: - name: "broker" required: false diff --git a/deploy/manifests/crds.yaml b/deploy/manifests/crds.yaml index cc2b67d1..66dbd9a2 100644 --- a/deploy/manifests/crds.yaml +++ b/deploy/manifests/crds.yaml @@ -62,6 +62,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -115,6 +116,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -208,6 +210,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -261,6 +264,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -497,6 +501,12 @@ spec: type: object type: object storage: + properties: + segmentCacheSizeGb: + format: uint16 + minimum: 0.0 + nullable: true + type: integer type: object type: object type: object @@ -550,6 +560,12 @@ spec: type: object type: object storage: + properties: + segmentCacheSizeGb: + format: uint16 + minimum: 0.0 + nullable: true + type: integer type: object type: object type: object @@ -774,6 +790,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -827,6 +844,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -931,6 +949,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object @@ -984,6 +1003,7 @@ spec: type: object type: object storage: + description: Storage configuration used by all roles except historical type: object type: object type: object diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 710e6628..503b4754 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -401,3 +401,29 @@ brokers: WARNING: The default values are _most likely_ not sufficient to run a proper cluster in production. Please adapt according to your requirements. For more details regarding Kubernetes CPU limits see: https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/[Assign CPU Resources to Containers and Pods]. + +==== Historical Resources + +In addition to the cpu and memory resources described above, historical Pods also accept a `storage` resource with the following properties: + +* `segmentCacheSizeGb` - used to set the maximum size allowed for the historical segment cache locations. See the Druid documentation regarding https://druid.apache.org/docs/latest/configuration/index.html#historical[druid.segmentCache.locations]. As the name suggests, the cache size is assumed to be in gigabytes. The operator creates an `emptyDir` with the exact value specified here and sets the `max_size` of the Druid property to be 10% less than that! By default, if no `segmentCacheSizeGb` is configured, the operator will create an `emptyDir` with a size of `1G` and a `maxSize` of `900M` for the segment cache property. + +Example historical configuration with storage resources: + +[source,yaml] +---- +historicals: + roleGroups: + default: + config: + resources: + cpu: + min: '200m' + max: "4" + memory: + limit: '2Gi' + storage: + # Generates an epmtyDir of size 3G + # and sets the Druid max_size segment cache to 2765M + segmentCacheSizeGb: 3 +---- diff --git a/examples/psql/psql-hdfs-druid-cluster.yaml b/examples/psql/psql-hdfs-druid-cluster.yaml index d18c9be6..803d81af 100644 --- a/examples/psql/psql-hdfs-druid-cluster.yaml +++ b/examples/psql/psql-hdfs-druid-cluster.yaml @@ -100,7 +100,15 @@ spec: selector: matchLabels: kubernetes.io/os: linux - config: {} + config: + resources: + cpu: + min: '200m' + max: '4' + memory: + limit: '2Gi' + storage: + segmentCacheSizeGb: 2 replicas: 1 middleManagers: roleGroups: diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml index d30b2731..2e4515e2 100644 --- a/rust/crd/Cargo.toml +++ b/rust/crd/Cargo.toml @@ -17,6 +17,8 @@ serde_json = "1.0" strum = { version = "0.24", features = ["derive"] } tracing = "0.1" snafu = "0.7" +lazy_static = "1.4" [dev-dependencies] serde_yaml = "0.8" +rstest = "0.15" diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index c9f3d272..61f2dbc4 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -1,22 +1,25 @@ +pub mod resource; +pub mod storage; + use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; + +use stackable_operator::product_config::types::PropertyNameKind; use stackable_operator::role_utils::RoleGroupRef; use stackable_operator::{ client::Client, commons::{ opa::OpaConfig, - resources::{CpuLimits, MemoryLimits, NoRuntimeLimits, Resources}, + resources::{NoRuntimeLimits, Resources}, s3::{InlinedS3BucketSpec, S3BucketDef, S3ConnectionDef, S3ConnectionSpec}, tls::{CaCert, Tls, TlsServerVerification, TlsVerification}, }, - config::merge::Merge, - k8s_openapi::apimachinery::pkg::api::resource::Quantity, kube::{CustomResource, ResourceExt}, product_config_utils::{ConfigError, Configuration}, role_utils::Role, schemars::{self, JsonSchema}, }; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; use strum::{Display, EnumDiscriminants, EnumIter, EnumString, IntoStaticStr}; @@ -96,6 +99,10 @@ const ENV_S3_ACCESS_KEY: &str = "AWS_ACCESS_KEY_ID"; const ENV_S3_SECRET_KEY: &str = "AWS_SECRET_ACCESS_KEY"; const SECRET_KEY_S3_ACCESS_KEY: &str = "accessKey"; const SECRET_KEY_S3_SECRET_KEY: &str = "secretKey"; +// segment storage +pub const SC_LOCATIONS: &str = "druid.segmentCache.locations"; +pub const SC_DIRECTORY: &str = "/stackable/var/druid/segment-cache"; +pub const SC_VOLUME_NAME: &str = "segment-cache"; #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] @@ -113,6 +120,8 @@ pub enum Error { IncompatibleS3Connections, #[snafu(display("Unknown Druid role found {role}. Should be one of {roles:?}"))] UnknownDruidRole { role: String, roles: Vec }, + #[snafu(display("failed to merge resource definitions"))] + ResourceMerge { source: resource::Error }, } #[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] @@ -137,11 +146,11 @@ pub struct DruidClusterSpec { pub stopped: Option, /// Desired Druid version pub version: String, - pub brokers: Role, - pub coordinators: Role, - pub historicals: Role, - pub middle_managers: Role, - pub routers: Role, + pub brokers: Role, + pub coordinators: Role, + pub historicals: Role, + pub middle_managers: Role, + pub routers: Role, pub metadata_storage_database: DatabaseConnectionSpec, pub deep_storage: DeepStorageSpec, pub ingestion: Option, @@ -249,15 +258,185 @@ impl DruidRole { } impl DruidCluster { - /// The spec for the given Role - pub fn get_role(&self, role: &DruidRole) -> &Role { - match role { - DruidRole::Coordinator => &self.spec.coordinators, - DruidRole::Broker => &self.spec.brokers, - DruidRole::MiddleManager => &self.spec.middle_managers, - DruidRole::Historical => &self.spec.historicals, - DruidRole::Router => &self.spec.routers, + pub fn common_compute_files( + &self, + role_name: &str, + file: &str, + ) -> Result>, ConfigError> { + let role = DruidRole::from_str(role_name).unwrap(); + + let mut result = BTreeMap::new(); + match file { + JVM_CONFIG => {} + RUNTIME_PROPS => { + // Plaintext port + result.insert( + PLAINTEXT.to_string(), + Some(role.get_http_port().to_string()), + ); + // extensions + let mut extensions = vec![ + String::from(EXT_KAFKA_INDEXING), + String::from(EXT_DATASKETCHES), + String::from(PROMETHEUS_EMITTER), + String::from(EXT_BASIC_SECURITY), + String::from(EXT_OPA_AUTHORIZER), + String::from(EXT_HDFS), + ]; + // metadata storage + let mds = self.spec.metadata_storage_database.clone(); + match mds.db_type { + DbType::Derby => {} // no additional extensions required + DbType::Postgresql => extensions.push(EXT_PSQL_MD_ST.to_string()), + DbType::Mysql => extensions.push(EXT_MYSQL_MD_ST.to_string()), + } + result.insert(MD_ST_TYPE.to_string(), Some(mds.db_type.to_string())); + result.insert( + MD_ST_CONNECT_URI.to_string(), + Some(mds.conn_string.to_string()), + ); + result.insert(MD_ST_HOST.to_string(), Some(mds.host.to_string())); + result.insert(MD_ST_PORT.to_string(), Some(mds.port.to_string())); + if let Some(user) = &mds.user { + result.insert(MD_ST_USER.to_string(), Some(user.to_string())); + } + if let Some(password) = &mds.password { + result.insert(MD_ST_PASSWORD.to_string(), Some(password.to_string())); + } + // s3 + if self.uses_s3() { + extensions.push(EXT_S3.to_string()); + } + // OPA + if let Some(_opa) = self.spec.opa.clone() { + result.insert( + AUTH_AUTHORIZERS.to_string(), + Some(AUTH_AUTHORIZERS_VALUE.to_string()), + ); + result.insert( + AUTH_AUTHORIZER_OPA_TYPE.to_string(), + Some(AUTH_AUTHORIZER_OPA_TYPE_VALUE.to_string()), + ); + // The opaUri still needs to be set, but that requires a discovery config map and is handled in the druid_controller.rs + } + // deep storage + result.insert( + DS_TYPE.to_string(), + Some(self.spec.deep_storage.to_string()), + ); + match self.spec.deep_storage.clone() { + DeepStorageSpec::HDFS(hdfs) => { + result.insert(DS_DIRECTORY.to_string(), Some(hdfs.directory)); + } + DeepStorageSpec::S3(s3_spec) => { + if let Some(key) = &s3_spec.base_key { + result.insert(DS_BASE_KEY.to_string(), Some(key.to_string())); + } + // bucket information (name, connection) needs to be resolved first, + // that is done directly in the controller + } + } + // other + result.insert( + EXTENSIONS_LOADLIST.to_string(), + Some(build_string_list(&extensions)), + ); + // metrics + result.insert( + PROMETHEUS_PORT.to_string(), + Some(DRUID_METRICS_PORT.to_string()), + ); + } + LOG4J2_CONFIG => {} + _ => {} } + + Ok(result) + } + + pub fn replicas(&self, rolegroup_ref: &RoleGroupRef) -> Option { + match DruidRole::from_str(rolegroup_ref.role.as_str()).unwrap() { + DruidRole::Broker => self + .spec + .brokers + .role_groups + .get(&rolegroup_ref.role_group) + .and_then(|rg| rg.replicas) + .map(i32::from), + DruidRole::MiddleManager => self + .spec + .middle_managers + .role_groups + .get(&rolegroup_ref.role_group) + .and_then(|rg| rg.replicas) + .map(i32::from), + DruidRole::Coordinator => self + .spec + .coordinators + .role_groups + .get(&rolegroup_ref.role_group) + .and_then(|rg| rg.replicas) + .map(i32::from), + DruidRole::Historical => self + .spec + .historicals + .role_groups + .get(&rolegroup_ref.role_group) + .and_then(|rg| rg.replicas) + .map(i32::from), + DruidRole::Router => self + .spec + .routers + .role_groups + .get(&rolegroup_ref.role_group) + .and_then(|rg| rg.replicas) + .map(i32::from), + } + } + + pub fn build_role_properties( + &self, + ) -> HashMap< + String, + ( + Vec, + Role>, + ), + > { + let config_files = vec![ + PropertyNameKind::Env, + PropertyNameKind::File(JVM_CONFIG.to_string()), + PropertyNameKind::File(LOG4J2_CONFIG.to_string()), + PropertyNameKind::File(RUNTIME_PROPS.to_string()), + ]; + + vec![ + ( + DruidRole::Broker.to_string(), + (config_files.clone(), self.spec.brokers.clone().erase()), + ), + ( + DruidRole::Historical.to_string(), + (config_files.clone(), self.spec.historicals.clone().erase()), + ), + ( + DruidRole::Router.to_string(), + (config_files.clone(), self.spec.routers.clone().erase()), + ), + ( + DruidRole::MiddleManager.to_string(), + ( + config_files.clone(), + self.spec.middle_managers.clone().erase(), + ), + ), + ( + DruidRole::Coordinator.to_string(), + (config_files, self.spec.coordinators.clone().erase()), + ), + ] + .into_iter() + .collect() } /// The name of the role-level load-balanced Kubernetes `Service` @@ -341,37 +520,137 @@ impl DruidCluster { } /// Retrieve and merge resource configs for role and role groups - pub fn resolve_resource_config_for_role_and_rolegroup( + pub fn resources( &self, role: &DruidRole, rolegroup_ref: &RoleGroupRef, - ) -> Option> { - // Initialize the result with all default values as baseline - let conf_defaults = DruidConfig::default_resources(); - - let role = self.get_role(role); - - // Retrieve role resource config - let mut conf_role: Resources = - role.config.config.resources.clone().unwrap_or_default(); - - // Retrieve rolegroup specific resource config - let mut conf_rolegroup: Resources = role - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.config.config.resources.clone()) - .unwrap_or_default(); - - // Merge more specific configs into default config - // Hierarchy is: - // 1. RoleGroup - // 2. Role - // 3. Default - conf_role.merge(&conf_defaults); - conf_rolegroup.merge(&conf_role); - - tracing::debug!("Merged resource config: {:?}", conf_rolegroup); - Some(conf_rolegroup) + ) -> Result { + let mut rg_resources = self.rolegroup_resources(role, rolegroup_ref); + let mut role_resources = self.role_resources(role); + let mut default_resources = self.default_resources(role); + + let result = resource::try_merge( + default_resources.as_mut(), + role_resources.as_mut(), + rg_resources.as_mut(), + ); + + result.context(ResourceMergeSnafu) + } + + fn default_resources(&self, role: &DruidRole) -> Option { + match role { + DruidRole::Broker => Some(resource::RoleResourceEnum::Druid( + BrokerConfig::default_resources(), + )), + DruidRole::Coordinator => Some(resource::RoleResourceEnum::Druid( + CoordinatorConfig::default_resources(), + )), + DruidRole::Historical => Some(resource::RoleResourceEnum::Historical( + HistoricalConfig::default_resources(), + )), + DruidRole::MiddleManager => Some(resource::RoleResourceEnum::Druid( + MiddleManagerConfig::default_resources(), + )), + DruidRole::Router => Some(resource::RoleResourceEnum::Druid( + RouterConfig::default_resources(), + )), + } + } + + fn role_resources(&self, role: &DruidRole) -> Option { + match role { + DruidRole::Broker => self + .spec + .brokers + .clone() + .config + .config + .resources + .map(resource::RoleResourceEnum::Druid), + DruidRole::Coordinator => self + .spec + .coordinators + .clone() + .config + .config + .resources + .map(resource::RoleResourceEnum::Druid), + DruidRole::Historical => self + .spec + .historicals + .clone() + .config + .config + .resources + .map(resource::RoleResourceEnum::Historical), + DruidRole::MiddleManager => self + .spec + .middle_managers + .clone() + .config + .config + .resources + .map(resource::RoleResourceEnum::Druid), + DruidRole::Router => self + .spec + .routers + .clone() + .config + .config + .resources + .map(resource::RoleResourceEnum::Druid), + } + } + + fn rolegroup_resources( + &self, + role: &DruidRole, + rolegroup_ref: &RoleGroupRef, + ) -> Option { + match role { + DruidRole::Broker => self + .spec + .brokers + .clone() + .role_groups + .get(&rolegroup_ref.role_group) + .map(|rg| &rg.config.config) + .and_then(|rg| rg.resources.clone()) + .map(resource::RoleResourceEnum::Druid), + DruidRole::Coordinator => self + .spec + .coordinators + .role_groups + .get(&rolegroup_ref.role_group) + .map(|rg| &rg.config.config) + .and_then(|rg| rg.resources.clone()) + .map(resource::RoleResourceEnum::Druid), + DruidRole::MiddleManager => self + .spec + .middle_managers + .role_groups + .get(&rolegroup_ref.role_group) + .map(|rg| &rg.config.config) + .and_then(|rg| rg.resources.clone()) + .map(resource::RoleResourceEnum::Druid), + DruidRole::Historical => self + .spec + .historicals + .role_groups + .get(&rolegroup_ref.role_group) + .map(|rg| &rg.config.config) + .and_then(|rg| rg.resources.clone()) + .map(resource::RoleResourceEnum::Historical), + DruidRole::Router => self + .spec + .routers + .role_groups + .get(&rolegroup_ref.role_group) + .map(|rg| &rg.config.config) + .and_then(|rg| rg.resources.clone()) + .map(resource::RoleResourceEnum::Druid), + } } } @@ -449,31 +728,69 @@ pub struct IngestionSpec { #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize, Default)] #[serde(rename_all = "camelCase")] -pub struct DruidConfig { - pub resources: Option>, +pub struct BrokerConfig { + #[serde(default, skip_serializing_if = "Option::is_none")] + resources: Option>, } -impl DruidConfig { - fn default_resources() -> Resources { - Resources { - cpu: CpuLimits { - min: Some(Quantity("200m".to_owned())), - max: Some(Quantity("4".to_owned())), - }, - memory: MemoryLimits { - limit: Some(Quantity("2Gi".to_owned())), - runtime_limits: NoRuntimeLimits {}, - }, - storage: DruidStorageConfig {}, - } - } +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct CoordinatorConfig { + #[serde(default, skip_serializing_if = "Option::is_none")] + resources: Option>, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct MiddleManagerConfig { + #[serde(default, skip_serializing_if = "Option::is_none")] + resources: Option>, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct RouterConfig { + #[serde(default, skip_serializing_if = "Option::is_none")] + resources: Option>, } -#[derive(Clone, Debug, Default, Deserialize, Eq, JsonSchema, Merge, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize, Default)] #[serde(rename_all = "camelCase")] -pub struct DruidStorageConfig {} +pub struct HistoricalConfig { + #[serde(default, skip_serializing_if = "Option::is_none")] + resources: Option>, +} + +impl MiddleManagerConfig { + pub fn default_resources() -> Resources { + resource::DEFAULT_RESOURCES.clone() + } +} + +impl CoordinatorConfig { + pub fn default_resources() -> Resources { + resource::DEFAULT_RESOURCES.clone() + } +} +impl RouterConfig { + pub fn default_resources() -> Resources { + resource::DEFAULT_RESOURCES.clone() + } +} + +impl BrokerConfig { + pub fn default_resources() -> Resources { + resource::DEFAULT_RESOURCES.clone() + } +} -impl Configuration for DruidConfig { +impl HistoricalConfig { + pub fn default_resources() -> Resources { + resource::HISTORICAL_RESOURCES.clone() + } +} + +impl Configuration for BrokerConfig { type Configurable = DruidCluster; fn compute_env( @@ -499,111 +816,151 @@ impl Configuration for DruidConfig { role_name: &str, file: &str, ) -> Result>, ConfigError> { - let role = DruidRole::from_str(role_name).unwrap(); + resource.common_compute_files(role_name, file) + } +} - let mut result = BTreeMap::new(); - match file { - JVM_CONFIG => {} - RUNTIME_PROPS => { - // Plaintext port - result.insert( - PLAINTEXT.to_string(), - Some(role.get_http_port().to_string()), - ); - // extensions - let mut extensions = vec![ - String::from(EXT_KAFKA_INDEXING), - String::from(EXT_DATASKETCHES), - String::from(PROMETHEUS_EMITTER), - String::from(EXT_BASIC_SECURITY), - String::from(EXT_OPA_AUTHORIZER), - String::from(EXT_HDFS), - ]; - // metadata storage - let mds = &resource.spec.metadata_storage_database; - match mds.db_type { - DbType::Derby => {} // no additional extensions required - DbType::Postgresql => extensions.push(EXT_PSQL_MD_ST.to_string()), - DbType::Mysql => extensions.push(EXT_MYSQL_MD_ST.to_string()), - } - result.insert(MD_ST_TYPE.to_string(), Some(mds.db_type.to_string())); - result.insert( - MD_ST_CONNECT_URI.to_string(), - Some(mds.conn_string.to_string()), - ); - result.insert(MD_ST_HOST.to_string(), Some(mds.host.to_string())); - result.insert(MD_ST_PORT.to_string(), Some(mds.port.to_string())); - if let Some(user) = &mds.user { - result.insert(MD_ST_USER.to_string(), Some(user.to_string())); - } - if let Some(password) = &mds.password { - result.insert(MD_ST_PASSWORD.to_string(), Some(password.to_string())); - } - // s3 - if resource.uses_s3() { - extensions.push(EXT_S3.to_string()); - } - // OPA - if let Some(_opa) = &resource.spec.opa { - result.insert( - AUTH_AUTHORIZERS.to_string(), - Some(AUTH_AUTHORIZERS_VALUE.to_string()), - ); - result.insert( - AUTH_AUTHORIZER_OPA_TYPE.to_string(), - Some(AUTH_AUTHORIZER_OPA_TYPE_VALUE.to_string()), - ); - // The opaUri still needs to be set, but that requires a discovery config map and is handled in the druid_controller.rs - } - // deep storage - result.insert( - DS_TYPE.to_string(), - Some(resource.spec.deep_storage.to_string()), - ); - match &resource.spec.deep_storage { - DeepStorageSpec::HDFS(hdfs) => { - result.insert(DS_DIRECTORY.to_string(), Some(hdfs.directory.to_string())); - } - DeepStorageSpec::S3(s3_spec) => { - if let Some(key) = &s3_spec.base_key { - result.insert(DS_BASE_KEY.to_string(), Some(key.to_string())); - } - // bucket information (name, connection) needs to be resolved first, - // that is done directly in the controller - } - } - // other - result.insert( - EXTENSIONS_LOADLIST.to_string(), - Some(build_string_list(&extensions)), - ); - // metrics - result.insert( - PROMETHEUS_PORT.to_string(), - Some(DRUID_METRICS_PORT.to_string()), - ); - // Role-specific config - if role == DruidRole::MiddleManager { - // When we start ingestion jobs they will run as new JVM processes. - // We need to set this config to pass the custom truststore not only to the Druid roles but also to the started ingestion jobs. - result.insert( - INDEXER_JAVA_OPTS.to_string(), - Some(build_string_list(&[ - format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}"), - format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TRUST_STORE_PASSWORD}"), - "-Djavax.net.ssl.trustStoreType=pkcs12".to_string() - ])) - ); - } - } - LOG4J2_CONFIG => {} - _ => {} +impl Configuration for HistoricalConfig { + type Configurable = DruidCluster; + + fn compute_env( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + let mut _result = BTreeMap::new(); + Ok(_result) + } + + fn compute_cli( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + Ok(BTreeMap::new()) + } + + fn compute_files( + &self, + resource: &Self::Configurable, + role_name: &str, + file: &str, + ) -> Result>, ConfigError> { + let mut result = resource.common_compute_files(role_name, file)?; + + if let Some(Resources { storage, .. }) = &self.resources.as_ref() { + let max_size = storage.segment_cache_max_size(); + result.insert( + SC_LOCATIONS.to_string(), + Some(format!( + "[{{\"path\":\"{SC_DIRECTORY}\",\"maxSize\":\"{max_size}\"}}]" + )), + ); } Ok(result) } } +impl Configuration for RouterConfig { + type Configurable = DruidCluster; + + fn compute_env( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + let mut _result = BTreeMap::new(); + Ok(_result) + } + + fn compute_cli( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + Ok(BTreeMap::new()) + } + + fn compute_files( + &self, + resource: &Self::Configurable, + role_name: &str, + file: &str, + ) -> Result>, ConfigError> { + resource.common_compute_files(role_name, file) + } +} + +impl Configuration for MiddleManagerConfig { + type Configurable = DruidCluster; + + fn compute_env( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + let mut _result = BTreeMap::new(); + Ok(_result) + } + + fn compute_cli( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + Ok(BTreeMap::new()) + } + + fn compute_files( + &self, + resource: &Self::Configurable, + role_name: &str, + file: &str, + ) -> Result>, ConfigError> { + let mut result = resource.common_compute_files(role_name, file)?; + result.insert( + INDEXER_JAVA_OPTS.to_string(), + Some(build_string_list(&[ + format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}"), + format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TRUST_STORE_PASSWORD}"), + "-Djavax.net.ssl.trustStoreType=pkcs12".to_string(), + ])), + ); + Ok(result) + } +} + +impl Configuration for CoordinatorConfig { + type Configurable = DruidCluster; + + fn compute_env( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + let mut _result = BTreeMap::new(); + Ok(_result) + } + + fn compute_cli( + &self, + _resource: &Self::Configurable, + _role_name: &str, + ) -> Result>, ConfigError> { + Ok(BTreeMap::new()) + } + + fn compute_files( + &self, + resource: &Self::Configurable, + role_name: &str, + file: &str, + ) -> Result>, ConfigError> { + resource.common_compute_files(role_name, file) + } +} + #[derive(Clone, Debug, Default, Deserialize, JsonSchema, Serialize)] #[serde(rename_all = "camelCase")] pub struct DruidClusterStatus {} @@ -619,101 +976,12 @@ fn build_string_list(strings: &[String]) -> String { #[cfg(test)] mod tests { use super::*; - use crate::DeepStorageSpec::HDFS; - use stackable_operator::role_utils::CommonConfiguration; - use stackable_operator::role_utils::RoleGroup; - use std::collections::HashMap; #[test] fn test_service_name_generation() { - let mut cluster = DruidCluster::new( - "testcluster", - DruidClusterSpec { - stopped: None, - version: "".to_string(), - brokers: Role { - config: CommonConfiguration { - config: DruidConfig { - resources: Some(DruidConfig::default_resources()), - }, - config_overrides: Default::default(), - env_overrides: Default::default(), - cli_overrides: Default::default(), - }, - role_groups: Default::default(), - }, - coordinators: Role { - config: CommonConfiguration { - config: DruidConfig { - resources: Some(DruidConfig::default_resources()), - }, - config_overrides: Default::default(), - env_overrides: Default::default(), - cli_overrides: Default::default(), - }, - role_groups: Default::default(), - }, - historicals: Role { - config: CommonConfiguration { - config: DruidConfig { - resources: Some(DruidConfig::default_resources()), - }, - config_overrides: Default::default(), - env_overrides: Default::default(), - cli_overrides: Default::default(), - }, - role_groups: Default::default(), - }, - middle_managers: Role { - config: CommonConfiguration { - config: DruidConfig { - resources: Some(DruidConfig::default_resources()), - }, - config_overrides: Default::default(), - env_overrides: Default::default(), - cli_overrides: Default::default(), - }, - role_groups: Default::default(), - }, - routers: Role { - config: CommonConfiguration { - config: DruidConfig { - resources: Some(DruidConfig::default_resources()), - }, - config_overrides: Default::default(), - env_overrides: Default::default(), - cli_overrides: Default::default(), - }, - role_groups: [( - "default".to_string(), - RoleGroup { - config: CommonConfiguration { - config: DruidConfig { - resources: Some(DruidConfig::default_resources()), - }, - config_overrides: Default::default(), - env_overrides: Default::default(), - cli_overrides: Default::default(), - }, - replicas: Some(1), - selector: None, - }, - )] - .into_iter() - .collect::>(), - }, - metadata_storage_database: Default::default(), - deep_storage: HDFS(HdfsDeepStorageSpec { - config_map_name: "simple-hdfs".to_string(), - directory: "/path/to/dir".to_string(), - }), - ingestion: Default::default(), - zookeeper_config_map_name: Default::default(), - opa: Default::default(), - }, - ); - - cluster.metadata.namespace = Some("default".to_string()); + let cluster_cr = + std::fs::File::open("test/resources/role_service/druid_cluster.yaml").unwrap(); + let cluster: DruidCluster = serde_yaml::from_reader(&cluster_cr).unwrap(); assert_eq!(cluster.metadata.name, Some("testcluster".to_string())); diff --git a/rust/crd/src/resource.rs b/rust/crd/src/resource.rs new file mode 100644 index 00000000..c66117f6 --- /dev/null +++ b/rust/crd/src/resource.rs @@ -0,0 +1,289 @@ +use crate::storage; +use lazy_static::lazy_static; +use snafu::Snafu; +use stackable_operator::{ + commons::resources::{CpuLimits, MemoryLimits, NoRuntimeLimits, Resources}, + config::merge::Merge, + k8s_openapi::{ + api::core::v1::ResourceRequirements, apimachinery::pkg::api::resource::Quantity, + }, +}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +#[derive(Debug, Clone, PartialEq)] +pub enum RoleResourceEnum { + Druid(Resources), + Historical(Resources), +} + +impl RoleResourceEnum { + pub fn as_resource_requirements(&self) -> ResourceRequirements { + match self { + Self::Druid(r) => r.clone().into(), + Self::Historical(r) => r.clone().into(), + } + } + pub fn as_memory_limits(&self) -> MemoryLimits { + match self { + Self::Druid(r) => r.clone().memory, + Self::Historical(r) => r.clone().memory, + } + } +} + +#[derive(Snafu, Debug, EnumDiscriminants, PartialEq, Eq)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("no resources available for merging"))] + NoResourcesToMerge, + #[snafu(display("cannot merge storage types of different roles"))] + IncompatibleStorageMerging, +} + +/// Merge resources from left to right: first > second > third. +/// Return a copy of the merged struct. +pub fn try_merge( + first: Option<&mut RoleResourceEnum>, + second: Option<&mut RoleResourceEnum>, + third: Option<&mut RoleResourceEnum>, +) -> Result { + let mut some = [first, second, third] + .into_iter() + .flatten() + .collect::>(); + + match some.len() { + 1 => Ok(some[0].clone()), + 2 => { + let tmp = some[0].clone(); + try_merge_private(some[1], &tmp) + } + 3 => { + let mut tmp = some[0].clone(); + tmp = try_merge_private(some[1], &tmp)?; + try_merge_private(some[2], &tmp) + } + _ => Err(Error::NoResourcesToMerge), + } +} + +/// Merges `rb` into `ra`, i.e. `ra` has precedence over `rb`. +fn try_merge_private( + ra: &mut RoleResourceEnum, + rb: &RoleResourceEnum, +) -> Result { + match (ra, rb) { + (RoleResourceEnum::Druid(a), RoleResourceEnum::Druid(b)) => { + a.merge(b); + Ok(RoleResourceEnum::Druid(a.clone())) + } + (RoleResourceEnum::Historical(a), RoleResourceEnum::Historical(b)) => { + a.merge(b); + Ok(RoleResourceEnum::Historical(a.clone())) + } + _ => Err(Error::IncompatibleStorageMerging), + } +} + +lazy_static! { + pub static ref DEFAULT_RESOURCES: Resources = + Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::DruidStorage {}, + }; + pub static ref HISTORICAL_RESOURCES: Resources = + Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(1), + }, + }; +} + +#[cfg(test)] +mod test { + use super::*; + use rstest::*; + + #[rstest] + #[case( + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(1), + }, + })), + None, + None, + Ok(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(1), + }, + })), + )] + #[case( + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(1), + }, + })), + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(2), + }, + })), + None, + Ok(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(2), + }, + })), + )] + #[case( + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(1), + }, + })), + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(2), + }, + })), + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(3), + }, + })), + Ok(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(3), + }, + })), + )] + #[case( + Some(RoleResourceEnum::Historical(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::HistoricalStorage { + segment_cache_size_gb: Some(1), + }, + })), + Some(RoleResourceEnum::Druid(Resources { + cpu: CpuLimits { + min: Some(Quantity("200m".to_owned())), + max: Some(Quantity("4".to_owned())), + }, + memory: MemoryLimits { + limit: Some(Quantity("2Gi".to_owned())), + runtime_limits: NoRuntimeLimits {}, + }, + storage: storage::DruidStorage { }, + })), + None, + Err(Error::IncompatibleStorageMerging), + )] + #[case(None, None, None, Err(Error::NoResourcesToMerge))] + pub fn test_try_merge( + #[case] mut first: Option, + #[case] mut second: Option, + #[case] mut third: Option, + #[case] expected: Result, + ) { + let got = try_merge(first.as_mut(), second.as_mut(), third.as_mut()); + + assert_eq!(expected, got); + } +} diff --git a/rust/crd/src/storage.rs b/rust/crd/src/storage.rs new file mode 100644 index 00000000..567a410b --- /dev/null +++ b/rust/crd/src/storage.rs @@ -0,0 +1,48 @@ +use stackable_operator::{ + builder::{ContainerBuilder, PodBuilder, VolumeBuilder}, + config::merge::Merge, + k8s_openapi::apimachinery::pkg::api::resource::Quantity, + schemars::{self, JsonSchema}, +}; + +use serde::{Deserialize, Serialize}; + +use crate::{SC_DIRECTORY, SC_VOLUME_NAME}; + +/// Storage configuration used by all roles except historical +#[derive(Clone, Debug, Default, Deserialize, Eq, JsonSchema, Merge, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DruidStorage {} + +#[derive(Clone, Debug, Default, Deserialize, JsonSchema, Merge, PartialEq, Serialize, Eq)] +#[serde(rename_all = "camelCase")] +pub struct HistoricalStorage { + pub segment_cache_size_gb: Option, +} + +impl HistoricalStorage { + /// Update the Pod with storage for the historical segment cache. + pub fn update_container(&self, pb: &mut PodBuilder, cb: &mut ContainerBuilder) { + let volume_size = match self.segment_cache_size_gb { + Some(v) => v, + _ => 1, + }; + cb.add_volume_mount(SC_VOLUME_NAME, SC_DIRECTORY); + pb.add_volume( + VolumeBuilder::new(SC_VOLUME_NAME) + .with_empty_dir(Some(""), Some(Quantity(format!("{}G", volume_size)))) + .build(), + ); + } + + /// This cannot fail (i.e. return Result) and must always return a valid quantity. + /// It computes the maximum segment cache size as 90% of the volume size. + pub fn segment_cache_max_size(&self) -> String { + if let Some(volume_size) = self.segment_cache_size_gb { + if volume_size > 1u16 { + return format!("{:.0}m", volume_size as f32 * 1024.0 * 0.9); + } + } + "900m".to_string() + } +} diff --git a/rust/crd/test/resources/role_service/druid_cluster.yaml b/rust/crd/test/resources/role_service/druid_cluster.yaml new file mode 100644 index 00000000..23617db1 --- /dev/null +++ b/rust/crd/test/resources/role_service/druid_cluster.yaml @@ -0,0 +1,41 @@ +--- +apiVersion: druid.stackable.tech/v1alpha1 +kind: DruidCluster +metadata: + name: testcluster + namespace: default + uid: test-uid +spec: + version: 24.0.0-stackable0.1.0 + zookeeperConfigMapName: psql-druid-znode + metadataStorageDatabase: + dbType: postgresql + connString: jdbc:postgresql://druid-postgresql/druid + host: druid-postgresql + port: 5432 + user: druid + password: druid + deepStorage: + hdfs: + configMapName: simple-hdfs + directory: /druid + brokers: + roleGroups: + default: + replicas: 1 + coordinators: + roleGroups: + default: + replicas: 1 + historicals: + roleGroups: + default: + replicas: 1 + middleManagers: + roleGroups: + default: + replicas: 1 + routers: + roleGroups: + default: + replicas: 1 diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index b22ee6c0..581338d9 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -30,3 +30,6 @@ tracing = "0.1" built = { version = "0.5", features = ["chrono", "git2"] } stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.25.2" } stackable-druid-crd = { path = "../crd" } + +[dev-dependencies] +rstest = "0.15" diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index 1e6cb4d3..ac6038f9 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -6,7 +6,7 @@ use crate::{ use snafu::{OptionExt, ResultExt, Snafu}; use stackable_druid_crd::{ - DeepStorageSpec, DruidCluster, DruidRole, DruidStorageConfig, APP_NAME, + resource::RoleResourceEnum, DeepStorageSpec, DruidCluster, DruidRole, APP_NAME, AUTH_AUTHORIZER_OPA_URI, CERTS_DIR, CONTAINER_HTTP_PORT, CONTAINER_METRICS_PORT, CONTROLLER_NAME, CREDENTIALS_SECRET_PROPERTY, DRUID_CONFIG_DIRECTORY, DRUID_METRICS_PORT, DS_BUCKET, HDFS_CONFIG_DIRECTORY, JVM_CONFIG, LOG4J2_CONFIG, RUNTIME_PROPS, @@ -21,7 +21,6 @@ use stackable_operator::{ cluster_resources::ClusterResources, commons::{ opa::OpaApiVersion, - resources::{NoRuntimeLimits, Resources}, s3::{S3AccessStyle, S3ConnectionSpec}, tls::{CaCert, TlsVerification}, }, @@ -52,7 +51,7 @@ use std::{ sync::Arc, time::Duration, }; -use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr}; +use strum::{EnumDiscriminants, IntoStaticStr}; const JVM_HEAP_FACTOR: f32 = 0.8; @@ -155,7 +154,7 @@ pub enum Error { role: String, }, #[snafu(display("failed to resolve and merge resource config for role and role group"))] - FailedToResolveResourceConfig, + FailedToResolveResourceConfig { source: stackable_druid_crd::Error }, #[snafu(display("invalid java heap config - missing default or value in crd?"))] InvalidJavaHeapConfig, #[snafu(display("failed to convert java heap config to unit [{unit}]"))] @@ -176,6 +175,14 @@ pub enum Error { source: stackable_operator::error::Error, name: String, }, + #[snafu(display("no quantity unit (k, m, g, etc.) given for [{value}]"))] + NoQuantityUnit { value: String }, + #[snafu(display("invalid quantity value"))] + InvalidQuantityValue { source: std::num::ParseIntError }, + #[snafu(display("segment cache location is required but missing"))] + NoSegmentCacheLocation, + #[snafu(display("role group and resource type mismatch. this is a programming error."))] + RoleResourceMismatch, } type Result = std::result::Result; @@ -240,23 +247,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< _ => None, }; - let mut roles = HashMap::new(); - - let config_files = vec![ - PropertyNameKind::Env, - PropertyNameKind::File(JVM_CONFIG.to_string()), - PropertyNameKind::File(LOG4J2_CONFIG.to_string()), - PropertyNameKind::File(RUNTIME_PROPS.to_string()), - ]; - - for role in DruidRole::iter() { - roles.insert( - role.to_string(), - (config_files.clone(), druid.get_role(&role).clone()), - ); - } - - let role_config = transform_all_roles_to_config(&*druid, roles); + let role_config = transform_all_roles_to_config(&*druid, druid.build_role_properties()); let validated_role_config = validate_all_roles_and_groups_config( druid_version(&druid)?, &role_config.context(ProductConfigTransformSnafu)?, @@ -288,7 +279,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< }; let resources = druid - .resolve_resource_config_for_role_and_rolegroup(&druid_role, &rolegroup) + .resources(&druid_role, &rolegroup) .context(FailedToResolveResourceConfigSnafu)?; let rg_service = build_rolegroup_services(&rolegroup, &druid, rolegroup_config)?; @@ -401,7 +392,7 @@ fn build_rolegroup_config_map( opa_connstr: Option<&str>, s3_conn: Option<&S3ConnectionSpec>, deep_storage_bucket_name: Option<&str>, - resources: &Resources, + resources: &RoleResourceEnum, ) -> Result { let role = DruidRole::from_str(&rolegroup.role).unwrap(); let mut cm_conf_data = BTreeMap::new(); // filename -> filecontent @@ -456,7 +447,7 @@ fn build_rolegroup_config_map( PropertyNameKind::File(file_name) if file_name == JVM_CONFIG => { let heap_in_mebi = to_java_heap_value( resources - .memory + .as_memory_limits() .limit .as_ref() .context(InvalidJavaHeapConfigSnafu)?, @@ -568,17 +559,13 @@ fn build_rolegroup_statefulset( druid: &DruidCluster, rolegroup_config: &HashMap>, s3_conn: Option<&S3ConnectionSpec>, - resources: &Resources, + resources: &RoleResourceEnum, ) -> Result { // setup let role = DruidRole::from_str(&rolegroup_ref.role).context(UnidentifiedDruidRoleSnafu { role: rolegroup_ref.role.to_string(), })?; let druid_version = druid_version(druid)?; - let rolegroup = druid - .get_role(&role) - .role_groups - .get(&rolegroup_ref.role_group); // init container builder let mut cb = ContainerBuilder::new(APP_NAME) @@ -675,6 +662,14 @@ fn build_rolegroup_statefulset( .build(), ); + // segment cache volume for historicals + if rolegroup_ref.role == DruidRole::Historical.to_string() { + if let RoleResourceEnum::Historical(hr) = resources { + hr.storage.update_container(&mut pb, &mut cb); + } else { + return Err(Error::RoleResourceMismatch); + } + } // readiness probe let probe = Probe { tcp_socket: Some(TCPSocketAction { @@ -686,7 +681,7 @@ fn build_rolegroup_statefulset( ..Default::default() }; cb.readiness_probe(probe); - cb.resources(resources.clone().into()); + cb.resources(resources.as_resource_requirements()); let mut container = cb.build(); container.image_pull_policy = Some("IfNotPresent".to_string()); @@ -713,7 +708,7 @@ fn build_rolegroup_statefulset( replicas: if druid.spec.stopped.unwrap_or(false) { Some(0) } else { - rolegroup.and_then(|rg| rg.replicas).map(i32::from) + druid.replicas(rolegroup_ref) }, selector: LabelSelector { match_labels: Some(role_group_selector_labels( @@ -743,3 +738,149 @@ pub fn druid_version(druid: &DruidCluster) -> Result<&str> { pub fn error_policy(_error: &Error, _ctx: Arc) -> Action { Action::requeue(Duration::from_secs(5)) } + +#[cfg(test)] +mod test { + + use super::*; + use rstest::*; + use stackable_druid_crd::{SC_LOCATIONS, SC_VOLUME_NAME}; + use stackable_operator::k8s_openapi::apimachinery::pkg::api::resource::Quantity; + use stackable_operator::product_config::ProductConfigManager; + + #[derive(Snafu, Debug, EnumDiscriminants)] + #[strum_discriminants(derive(IntoStaticStr))] + #[allow(clippy::enum_variant_names)] + pub enum Error { + #[snafu(display("controller error"))] + Controller { source: super::Error }, + #[snafu(display("product config error"))] + ProductConfig { + source: stackable_operator::product_config::error::Error, + }, + #[snafu(display("product config utils error"))] + ProductConfigUtils { + source: stackable_operator::product_config_utils::ConfigError, + }, + #[snafu(display("operator framework error"))] + OperatorFramework { + source: stackable_operator::error::Error, + }, + #[snafu(display("resource error"))] + CrdResource { source: stackable_druid_crd::Error }, + } + + #[rstest] + #[case( + "druid_cluster.yaml", + "default", + "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"2765m\"}]", + "3G" + )] + #[case( + "druid_cluster.yaml", + "secondary", + "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"1843m\"}]", + "2G" + )] + #[case( + "druid_cluster_defaults.yaml", + "default", + "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"900m\"}]", + "1G" + )] + pub fn historical_segment_cache( + #[case] druid_manifest: &str, + #[case] tested_rolegroup_name: &str, + #[case] expected_druid_segment_cache_property: &str, + #[case] expected_segment_cache_volume_size: &str, + ) -> Result<(), Error> { + let cluster_cr = std::fs::File::open(format!( + "test/resources/historical_segment_cache/{druid_manifest}" + )) + .unwrap(); + let druid: DruidCluster = serde_yaml::from_reader(&cluster_cr).unwrap(); + let role_config = transform_all_roles_to_config(&druid, druid.build_role_properties()); + + let product_config_manager = ProductConfigManager::from_yaml_file( + "test/resources/historical_segment_cache/properties.yaml", + ) + .context(ProductConfigSnafu)?; + + let validated_role_config = validate_all_roles_and_groups_config( + druid_version(&druid).context(ControllerSnafu)?, + &role_config.context(ProductConfigUtilsSnafu)?, + &product_config_manager, + false, + false, + ) + .context(OperatorFrameworkSnafu)?; + + let mut volume_segment_cache_size = Quantity("-1".to_string()); + let mut druid_segment_cache_property = "invalid".to_string(); + + for (role_name, role_config) in validated_role_config.iter() { + for (rolegroup_name, rolegroup_config) in role_config.iter() { + if rolegroup_name == tested_rolegroup_name + && role_name == &DruidRole::Historical.to_string() + { + let rolegroup_ref = RoleGroupRef { + cluster: ObjectRef::from_obj(&druid), + role: role_name.into(), + role_group: rolegroup_name.clone(), + }; + + let resources = druid + .resources(&DruidRole::Historical, &rolegroup_ref) + .context(CrdResourceSnafu)?; + + let sts = build_rolegroup_statefulset( + &rolegroup_ref, + &druid, + rolegroup_config, + None, + &resources, + ) + .context(ControllerSnafu)?; + + volume_segment_cache_size = sts + .spec + .unwrap() + .template + .spec + .unwrap() + .volumes + .unwrap() + .into_iter() + .find(|v| v.name == SC_VOLUME_NAME) + .unwrap() + .empty_dir + .unwrap() + .size_limit + .unwrap(); + + druid_segment_cache_property = role_config + .get(rolegroup_name) + .unwrap() + .get(&PropertyNameKind::File(RUNTIME_PROPS.to_string())) + .unwrap() + .get(&SC_LOCATIONS.to_string()) + .unwrap() + .clone(); + + break; + } + } + } + assert_eq!( + druid_segment_cache_property, + expected_druid_segment_cache_property + ); + assert_eq!( + volume_segment_cache_size, + Quantity(expected_segment_cache_volume_size.to_string()) + ); + + Ok(()) + } +} diff --git a/rust/operator-binary/test/resources/historical_segment_cache/druid_cluster.yaml b/rust/operator-binary/test/resources/historical_segment_cache/druid_cluster.yaml new file mode 100644 index 00000000..426e7fe4 --- /dev/null +++ b/rust/operator-binary/test/resources/historical_segment_cache/druid_cluster.yaml @@ -0,0 +1,65 @@ +--- +apiVersion: druid.stackable.tech/v1alpha1 +kind: DruidCluster +metadata: + name: psql-druid + uid: test-uid +spec: + version: 24.0.0-stackable0.1.0 + zookeeperConfigMapName: psql-druid-znode + metadataStorageDatabase: + dbType: postgresql + connString: jdbc:postgresql://druid-postgresql/druid + host: druid-postgresql + port: 5432 + user: druid + password: druid + deepStorage: + hdfs: + configMapName: simple-hdfs + directory: /druid + brokers: + roleGroups: + default: + config: {} + replicas: 1 + coordinators: + roleGroups: + default: + config: {} + replicas: 1 + historicals: + config: + resources: + cpu: + min: '200m' + max: '4' + memory: + limit: '2Gi' + storage: + segmentCacheSizeGb: 2 + roleGroups: + default: + config: + resources: + cpu: + min: '200m' + max: '4' + memory: + limit: '2Gi' + storage: + segmentCacheSizeGb: 3 + replicas: 1 + secondary: + config: {} + replicas: 1 + middleManagers: + roleGroups: + default: + config: {} + replicas: 1 + routers: + roleGroups: + default: + config: {} + replicas: 1 diff --git a/rust/operator-binary/test/resources/historical_segment_cache/druid_cluster_defaults.yaml b/rust/operator-binary/test/resources/historical_segment_cache/druid_cluster_defaults.yaml new file mode 100644 index 00000000..32eb2d9b --- /dev/null +++ b/rust/operator-binary/test/resources/historical_segment_cache/druid_cluster_defaults.yaml @@ -0,0 +1,48 @@ +--- +apiVersion: druid.stackable.tech/v1alpha1 +kind: DruidCluster +metadata: + name: psql-druid + uid: test-uid ### needed for ownerreference_from_resource() to succeed +spec: + version: 24.0.0-stackable0.1.0 + zookeeperConfigMapName: psql-druid-znode + metadataStorageDatabase: + dbType: postgresql + connString: jdbc:postgresql://druid-postgresql/druid + host: druid-postgresql + port: 5432 + user: druid + password: druid + deepStorage: + hdfs: + configMapName: simple-hdfs + directory: /druid + brokers: + roleGroups: + default: + replicas: 1 + config: {} + coordinators: + roleGroups: + default: + replicas: 1 + config: {} + historicals: + roleGroups: + default: + replicas: 1 + config: {} + secondary: + replicas: 1 + config: {} + middleManagers: + roleGroups: + default: + replicas: 1 + config: {} + routers: + roleGroups: + default: + replicas: 1 + config: {} diff --git a/rust/operator-binary/test/resources/historical_segment_cache/properties.yaml b/rust/operator-binary/test/resources/historical_segment_cache/properties.yaml new file mode 100644 index 00000000..41e1c084 --- /dev/null +++ b/rust/operator-binary/test/resources/historical_segment_cache/properties.yaml @@ -0,0 +1,766 @@ +--- +version: 0.1.0 +spec: + units: + - unit: &unitDirectory + name: "directory" + regex: "^/|(/[\\w-]+)+$" + examples: + - "/tmp/xyz" + - unit: &unitPort + name: "port" + regex: "^([0-9]{1,4}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$" + - unit: &unitPrometheusNamespace + name: "prometheusNamespace" + regex: "^[a-zA-Z_:][a-zA-Z0-9_:]*$" + - unit: &unitDuration + name: "duration" + regex: "^P(?!$)(\\d+Y)?(\\d+M)?(\\d+W)?(\\d+D)?(T(?=\\d)(\\d+H)?(\\d+M)?(\\d+S)?)?$" + examples: + - "PT300S" + +################################################################################################### +# runtime.properties +# For information on the properties, see: https://druid.apache.org/docs/latest/configuration/index.html +################################################################################################### + +properties: + + - property: &plaintext + propertyNames: + - name: "druid.plaintext" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "integer" + min: "1" + max: "65535" + unit: *unitPort + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &startupLoggingLogProperties + propertyNames: + - name: "druid.startup.logging.logProperties" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "bool" + defaultValues: + - fromVersion: "0.0.0" + value: "true" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &monitoringMonitors + propertyNames: + - name: "druid.monitoring.monitors" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + defaultValues: + - fromVersion: "0.0.0" + value: "[\"org.apache.druid.java.util.metrics.JvmMonitor\"]" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &emitter + propertyNames: + - name: "druid.emitter" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + defaultValues: + - fromVersion: "0.0.0" + value: "prometheus" + allowedValues: + - "noop" + - "logging" + - "http" + - "parametrized" + - "composing" + - "prometheus" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &emitterPrometheusStrategy + propertyNames: + - name: "druid.emitter.prometheus.strategy" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + defaultValues: + - fromVersion: "0.0.0" + value: "exporter" + allowedValues: + - "exporter" + - "pushgateway" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &emitterPrometheusNamespace + propertyNames: + - name: "druid.emitter.prometheus.namespace" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitPrometheusNamespace + defaultValues: + - fromVersion: "0.0.0" + value: "druid" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &emitterPrometheusPort + propertyNames: + - name: "druid.emitter.prometheus.port" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "integer" + min: "1" + max: "65535" + unit: *unitPort + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &metadataStorageType + propertyNames: + - name: "druid.metadata.storage.type" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + allowedValues: + - "mysql" + - "postgresql" + - "derby" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &metadataStorageConnectURI + propertyNames: + - name: "druid.metadata.storage.connector.connectURI" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &metadataStorageHost + propertyNames: + - name: "druid.metadata.storage.connector.host" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &metadataStoragePort + propertyNames: + - name: "druid.metadata.storage.connector.port" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "integer" + min: "1024" + max: "65535" + unit: *unitPort + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &metadataStorageUser + propertyNames: + - name: "druid.metadata.storage.connector.user" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &metadataStoragePassword + propertyNames: + - name: "druid.metadata.storage.connector.password" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerLogsDirectory + propertyNames: + - name: "druid.indexer.logs.directory" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDirectory + defaultValues: + - fromVersion: "0.0.0" + value: "/stackable/var/druid/indexing-logs" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: true + - name: "historical" + required: true + - name: "middleManager" + required: true + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &processingTmpDir + propertyNames: + - name: "druid.processing.tmpDir" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDirectory + defaultValues: + - fromVersion: "0.0.0" + value: "/stackable/var/druid/processing" + roles: + - name: "broker" + required: true + - name: "coordinator" + required: false + - name: "historical" + required: true + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerTaskHadoopWorkingPath + propertyNames: + - name: "druid.indexer.task.hadoopWorkingPath" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDirectory + defaultValues: + - fromVersion: "0.0.0" + value: "/stackable/var/druid/hadoop-tmp" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: true + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerTaskBaseTaskDir + propertyNames: + - name: "druid.indexer.task.baseTaskDir" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDirectory + defaultValues: + - fromVersion: "0.0.0" + value: "/stackable/var/druid/task" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: true + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerRunnerJavaOpts + propertyNames: + - name: "druid.indexer.runner.javaOpts" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + defaultValues: + - fromVersion: "0.0.0" + value: "-server -Xms256m -Xmx256m -XX:MaxDirectMemorySize=300m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: true + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &routerManagementProxyEnabled + propertyNames: + # Management proxy to coordinator / overlord: required for unified web console. + - name: "druid.router.managementProxy.enabled" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "bool" + defaultValues: + - fromVersion: "0.0.0" + value: "true" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &routerHttpNumConnections + propertyNames: + - name: "druid.router.http.numConnections" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "integer" + defaultValues: + - fromVersion: "0.0.0" + value: "25" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: true + asOfVersion: "0.0.0" + + - property: &historicalCacheUseCache + propertyNames: + - name: "druid.historical.cache.useCache" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "bool" + defaultValues: + - fromVersion: "0.0.0" + value: "true" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: true + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &historicalCachePopulateCache + propertyNames: + - name: "druid.historical.cache.populateCache" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "bool" + defaultValues: + - fromVersion: "0.0.0" + value: "true" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: true + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &coordinatorStartDelay + propertyNames: + - name: "druid.coordinator.startDelay" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDuration + defaultValues: + - fromVersion: "0.0.0" + value: "PT20S" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerQueueStartDelay + propertyNames: + - name: "druid.indexer.queue.startDelay" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDuration + defaultValues: + - fromVersion: "0.0.0" + value: "PT20S" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerRunnerType + propertyNames: + - name: "druid.indexer.runner.type" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + allowedValues: + - "local" + - "remote" + - "httpRemote" + defaultValues: + - fromVersion: "0.0.0" + value: "remote" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &indexerStorageType + propertyNames: + - name: "druid.indexer.storage.type" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + allowedValues: + - "local" + - "metadata" + defaultValues: + - fromVersion: "0.0.0" + value: "metadata" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &coordinatorPeriod + propertyNames: + - name: "druid.coordinator.period" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + unit: *unitDuration + defaultValues: + - fromVersion: "0.0.0" + value: "PT20S" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &coordinatorAsOverlordEnabled + propertyNames: + - name: "druid.coordinator.asOverlord.enabled" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "bool" + defaultValues: + - fromVersion: "0.0.0" + value: "true" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &segmentCacheLocations + propertyNames: + - name: "druid.segmentCache.locations" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + defaultValues: + - fromVersion: "0.0.0" + value: "[{\"path\":\"/stackable/var/druid/segment-cache\",\"maxSize\":\"900m\"}]" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: false + - name: "historical" + required: true + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + + - property: &coordinatorAsOverlordOverlordService + propertyNames: + - name: "druid.coordinator.asOverlord.overlordService" + kind: + type: "file" + file: "runtime.properties" + datatype: + type: "string" + defaultValues: + - fromVersion: "0.0.0" + value: "druid/overlord" + roles: + - name: "broker" + required: false + - name: "coordinator" + required: true + - name: "historical" + required: false + - name: "middleManager" + required: false + - name: "router" + required: false + asOfVersion: "0.0.0" + +################################################################################################### +# jvm.config +###################################################################################################