A lightweight Haskell framework for processing event streams (logs, user activity, transactions) using pure functions, immutability, higher-order functions, and lazy evaluation. Built as a Functional Programming project, while modeling patterns found in real-world stream processing systems.
- Pure functional pipeline engine for composing reusable processing stages
- Algebraic Data Types (ADTs) for safe, extensible event modeling
- Immutable streams (no in-place updates; transformations produce new streams)
- Lazy evaluation support for large inputs and infinite streams
- Clear separation of IO and pure logic for maintainability and testability
- Imperative comparison module to contrast readability and complexity
- M.N. Sulaiman(4342): Framework Core & Architecture
- M.M. Musharraf(4080): Pipeline Stages
- M.F.R. Ahamed(3807): Stream Processing
- A.C.M. Farhad(3923): Input Handling & Evaluation
A Functional Event Stream Processing Framework Using Haskell
Modern software systems generate massive volumes of events (e.g., logs, clicks, transactions). Traditional imperative solutions often rely on mutable state and loop-heavy code, which can reduce maintainability and make testing harder.
This project demonstrates how Functional Programming improves event processing through:
- deterministic computation (pure functions)
- declarative pipelines (composition)
- safe modeling via strong types and ADTs
- scalable handling of large/infinite streams using laziness
-
Reads Data
Loads events from a file (data/) or generates events automatically. -
Creates a Stream
Represents events as an immutable list/stream. -
Processes via Pipelines
Runs events through composable stages such as:- filter (select events)
- map (transform events)
- aggregate (counts, sums, grouped results)
-
Outputs Summaries
Prints computed statistics (counts, totals, etc.) to the terminal.
app/
βββ Main.hs -- Entry point and IO orchestration
src/
βββ Core/
β βββ Event.hs -- Event ADTs and core domain types
β βββ Stream.hs -- Stream representation and helpers
β βββ Pipeline.hs -- Pipeline composition logic
β
βββ Processing/
β βββ Stages.hs -- Reusable stages (filter, map, aggregate)
β βββ Pipelines.hs -- Ready-to-use pipelines for use-cases
β
βββ Input/
β βββ Parser.hs -- Parsing + validation + error handling
β
βββ Evaluation/
β βββ Comparison.hs -- Functional vs imperative comparison
β
βββ Utils/
βββ Helpers.hs -- Shared utility functions
data/ -- Sample input files
docs/ -- Documentation / notes
test/ -- Test cases
- GHC (Glasgow Haskell Compiler)
- Stack
stack buildstack runSample input files are located in the data/ directory.
Example format:
LOGIN,user1,2025-01-10
ERROR,404,2025-01-10
TRANSACTION,user2,250.50,2025-01-11
LOGIN,<userId>,<date>ERROR,<errorCode>,<date>TRANSACTION,<userId>,<amount>,<date>
Total login events: 1
Total error events: 1
Total transaction amount: 250.50
All core processing logic is pure and deterministic.
filterStage :: (a -> Bool) -> Pipeline a aEvents are modeled with ADTs for safety and extensibility.
data Event
= LoginEvent UserId Timestamp
| ErrorEvent Int Timestamp
| TransactionEvent UserId Amount TimestampStages are constructed using map, filter, foldr/foldl, and function composition.
Streams are never mutated; each stage produces a new derived stream.
Large or infinite streams can be processed efficiently.
take 1000 $ runPipeline pipeline infiniteEventStream- IO orchestration lives in
Main.hs - parsing and validation are isolated in
Input/Parser.hs - pipelines and stages remain pure and easy to unit test
An imperative approach (loop/state style) is included to highlight differences in:
- code complexity
- readability
- extensibility
- maintainability
This framework models real event-processing patterns used in industry:
- log analytics and monitoring
- user activity tracking
- transaction summarization
- streaming-style transformations with composable operators
Although intentionally lightweight and academic, the architecture reflects concepts behind larger stream systemsβimplemented using purely functional principles.
Basic tests verify:
- pipeline composition correctness
- deterministic filtering and aggregation
- predictable results from pure transformations
Run tests (if configured):
stack test- Add support for JSON/CSV parsing
- Add windowed aggregation (time windows)
- Add group-by pipelines (per user, per error code)
- Add stream sinks (write results to file)
- Add property-based tests (QuickCheck)
This project shows how Haskell and Functional Programming enable reliable, expressive, and testable stream processing. By leveraging strong types, immutability, higher-order functions, and lazy evaluation, the framework provides a clean approach to event processing suitable for both academic learning and practical design inspiration.