Skip to content

Commit

Permalink
feat: add decolorize processor (GreptimeTeam#5065)
Browse files Browse the repository at this point in the history
* feat: add decolorize processor

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/pipeline/src/etl/processor/cmcd.rs

* add crate level integration test

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Nov 29, 2024
1 parent 6308e86 commit c049ce6
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 26 deletions.
12 changes: 7 additions & 5 deletions src/pipeline/src/etl/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod cmcd;
pub mod csv;
pub mod date;
pub mod decolorize;
pub mod dissect;
pub mod epoch;
pub mod gsub;
Expand All @@ -29,6 +30,7 @@ use ahash::{HashSet, HashSetExt};
use cmcd::{CmcdProcessor, CmcdProcessorBuilder};
use csv::{CsvProcessor, CsvProcessorBuilder};
use date::{DateProcessor, DateProcessorBuilder};
use decolorize::{DecolorizeProcessor, DecolorizeProcessorBuilder};
use dissect::{DissectProcessor, DissectProcessorBuilder};
use enum_dispatch::enum_dispatch;
use epoch::{EpochProcessor, EpochProcessorBuilder};
Expand Down Expand Up @@ -61,11 +63,6 @@ const TARGET_FIELDS_NAME: &str = "target_fields";
const JSON_PATH_NAME: &str = "json_path";
const JSON_PATH_RESULT_INDEX_NAME: &str = "result_index";

// const IF_NAME: &str = "if";
// const IGNORE_FAILURE_NAME: &str = "ignore_failure";
// const ON_FAILURE_NAME: &str = "on_failure";
// const TAG_NAME: &str = "tag";

/// Processor trait defines the interface for all processors.
///
/// A processor is a transformation that can be applied to a field in a document
Expand Down Expand Up @@ -99,6 +96,7 @@ pub enum ProcessorKind {
Epoch(EpochProcessor),
Date(DateProcessor),
JsonPath(JsonPathProcessor),
Decolorize(DecolorizeProcessor),
}

/// ProcessorBuilder trait defines the interface for all processor builders
Expand Down Expand Up @@ -128,6 +126,7 @@ pub enum ProcessorBuilders {
Epoch(EpochProcessorBuilder),
Date(DateProcessorBuilder),
JsonPath(JsonPathProcessorBuilder),
Decolorize(DecolorizeProcessorBuilder),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -275,6 +274,9 @@ fn parse_processor(doc: &yaml_rust::Yaml) -> Result<ProcessorBuilders> {
json_path::PROCESSOR_JSON_PATH => {
ProcessorBuilders::JsonPath(json_path::JsonPathProcessorBuilder::try_from(value)?)
}
decolorize::PROCESSOR_DECOLORIZE => {
ProcessorBuilders::Decolorize(DecolorizeProcessorBuilder::try_from(value)?)
}
_ => return UnsupportedProcessorSnafu { processor: str_key }.fail(),
};

Expand Down
4 changes: 4 additions & 0 deletions src/pipeline/src/etl/processor/cmcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Pipeline Processor for CMCD (Common Media Client Data) data.
//!
//! Refer to [`CmcdProcessor`] for more information.
use std::collections::BTreeMap;

use ahash::HashSet;
Expand Down
195 changes: 195 additions & 0 deletions src/pipeline/src/etl/processor/decolorize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Removes ANSI color control codes from the input text.
//!
//! Similar to [`decolorize`](https://grafana.com/docs/loki/latest/query/log_queries/#removing-color-codes)
//! from Grafana Loki and [`strip_ansi_escape_codes`](https://vector.dev/docs/reference/vrl/functions/#strip_ansi_escape_codes)
//! from Vector VRL.
use ahash::HashSet;
use once_cell::sync::Lazy;
use regex::Regex;
use snafu::OptionExt;

use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::{Fields, OneInputOneOutputField};
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, ProcessorBuilder, ProcessorKind, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME,
};
use crate::etl::value::Value;

pub(crate) const PROCESSOR_DECOLORIZE: &str = "decolorize";

static RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\x1b\[[0-9;]*m").unwrap());

#[derive(Debug, Default)]
pub struct DecolorizeProcessorBuilder {
fields: Fields,
ignore_missing: bool,
}

impl ProcessorBuilder for DecolorizeProcessorBuilder {
fn output_keys(&self) -> HashSet<&str> {
self.fields
.iter()
.map(|f| f.target_or_input_field())
.collect()
}

fn input_keys(&self) -> HashSet<&str> {
self.fields.iter().map(|f| f.input_field()).collect()
}

fn build(self, intermediate_keys: &[String]) -> Result<ProcessorKind> {
self.build(intermediate_keys).map(ProcessorKind::Decolorize)
}
}

impl DecolorizeProcessorBuilder {
fn build(self, intermediate_keys: &[String]) -> Result<DecolorizeProcessor> {
let mut real_fields = vec![];
for field in self.fields.into_iter() {
let input = OneInputOneOutputField::build(
"decolorize",
intermediate_keys,
field.input_field(),
field.target_or_input_field(),
)?;
real_fields.push(input);
}
Ok(DecolorizeProcessor {
fields: real_fields,
ignore_missing: self.ignore_missing,
})
}
}

/// Remove ANSI color control codes from the input text.
#[derive(Debug, Default)]
pub struct DecolorizeProcessor {
fields: Vec<OneInputOneOutputField>,
ignore_missing: bool,
}

impl DecolorizeProcessor {
fn process_string(&self, val: &str) -> Result<Value> {
Ok(Value::String(RE.replace_all(val, "").into_owned()))
}

fn process(&self, val: &Value) -> Result<Value> {
match val {
Value::String(val) => self.process_string(val),
_ => ProcessorExpectStringSnafu {
processor: PROCESSOR_DECOLORIZE,
v: val.clone(),
}
.fail(),
}
}
}

impl TryFrom<&yaml_rust::yaml::Hash> for DecolorizeProcessorBuilder {
type Error = Error;

fn try_from(value: &yaml_rust::yaml::Hash) -> Result<Self> {
let mut fields = Fields::default();
let mut ignore_missing = false;

for (k, v) in value.iter() {
let key = k
.as_str()
.with_context(|| KeyMustBeStringSnafu { k: k.clone() })?;

match key {
FIELD_NAME => {
fields = Fields::one(yaml_new_field(v, FIELD_NAME)?);
}
FIELDS_NAME => {
fields = yaml_new_fields(v, FIELDS_NAME)?;
}
IGNORE_MISSING_NAME => {
ignore_missing = yaml_bool(v, IGNORE_MISSING_NAME)?;
}
_ => {}
}
}

Ok(DecolorizeProcessorBuilder {
fields,
ignore_missing,
})
}
}

impl crate::etl::processor::Processor for DecolorizeProcessor {
fn kind(&self) -> &str {
PROCESSOR_DECOLORIZE
}

fn ignore_missing(&self) -> bool {
self.ignore_missing
}

fn exec_mut(&self, val: &mut Vec<Value>) -> Result<()> {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::Null) | None => {
if !self.ignore_missing {
return ProcessorMissingFieldSnafu {
processor: self.kind(),
field: field.input_name(),
}
.fail();
}
}
Some(v) => {
let result = self.process(v)?;
let output_index = field.output_index();
val[output_index] = result;
}
}
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_decolorize_processor() {
let processor = DecolorizeProcessor {
fields: vec![],
ignore_missing: false,
};

let val = Value::String("\x1b[32mGreen\x1b[0m".to_string());
let result = processor.process(&val).unwrap();
assert_eq!(result, Value::String("Green".to_string()));

let val = Value::String("Plain text".to_string());
let result = processor.process(&val).unwrap();
assert_eq!(result, Value::String("Plain text".to_string()));

let val = Value::String("\x1b[46mfoo\x1b[0m bar".to_string());
let result = processor.process(&val).unwrap();
assert_eq!(result, Value::String("foo bar".to_string()));
}
}
17 changes: 0 additions & 17 deletions src/pipeline/src/etl/processor/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,6 @@ impl DissectProcessor {
let mut pos = 0;

let mut appends: HashMap<usize, Vec<(String, u32)>> = HashMap::new();
// let mut maps: HashMap<usize, (String,String)> = HashMap::new();

let mut process_name_value = |name: &Name, value: String| {
let name_index = name.index;
Expand All @@ -658,22 +657,6 @@ impl DissectProcessor {
.or_default()
.push((value, order.unwrap_or_default()));
}
// Some(StartModifier::MapKey) => match maps.get(&name_index) {
// Some(map_val) => {
// map.insert(value, Value::String(map_val.to_string()));
// }
// None => {
// maps.insert(name_index, value);
// }
// },
// Some(StartModifier::MapVal) => match maps.get(&name_index) {
// Some(map_key) => {
// map.insert(map_key, Value::String(value));
// }
// None => {
// maps.insert(name_index, value);
// }
// },
Some(_) => {
// do nothing, ignore MapKey and MapVal
// because transform can know the key name
Expand Down
4 changes: 0 additions & 4 deletions src/pipeline/src/etl/processor/gsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ impl GsubProcessor {
v: val.clone(),
}
.fail(),
// Err(format!(
// "{} processor: expect string or array string, but got {val:?}",
// self.kind()
// )),
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions src/pipeline/tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,36 @@ transform:

assert_eq!(expected, r);
}

#[test]
fn test_decolorize() {
let input_value = serde_json::json!({
"message": "\u{001b}[32mSuccess\u{001b}[0m and \u{001b}[31mError\u{001b}[0m"
});

let pipeline_yaml = r#"
processors:
- decolorize:
fields:
- message
transform:
- fields:
- message
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut status).unwrap();
let row = pipeline.exec_mut(&mut status).unwrap();

let r = row
.values
.into_iter()
.map(|v| v.value_data.unwrap())
.collect::<Vec<_>>();

let expected = StringValue("Success and Error".into());
assert_eq!(expected, r[0]);
}

0 comments on commit c049ce6

Please sign in to comment.