diff --git a/.gitignore b/.gitignore index 7600941..2779ce5 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ dist *.tmp *.dat *.idx +*.csv zig-watch* # local env files diff --git a/billions.zig b/billions.zig index 4d05077..7309179 100644 --- a/billions.zig +++ b/billions.zig @@ -2,7 +2,8 @@ const std = @import("std"); const Codspeed = @import("codspeed"); const tsm = @import("src/tsm/tsm.zig"); -const TsmTree = tsm.TsmTreeImpl(1000, 4096, .gorilla); +const TsmTree = tsm.TsmTreeRuntime; + const metric_name = "bench.cpu.total.billions"; const hosts = [_][]const u8{ "h-0", "h-1", "h-2", "h-3", "h-4", @@ -37,7 +38,7 @@ fn runBenchmark(allocator: std.mem.Allocator) !void { var max_memory_bytes: u64 = 0; - var tree = try TsmTree.init(allocator, metric_name); + var tree = try TsmTree.init(allocator, metric_name, .Native); defer tree.deinit(); const start = std.time.nanoTimestamp(); diff --git a/build.zig b/build.zig index d818313..0c71269 100644 --- a/build.zig +++ b/build.zig @@ -30,15 +30,27 @@ pub fn build(b: *std.Build) void { .target = target, .optimize = optimize, }); + const codspeed = b.dependency("codspeed", .{ .target = target, .optimize = benchmark_optimize, }); + const nats = b.dependency("nats", .{ + .target = target, + .optimize = optimize, + }); + + const toml = b.dependency("toml", .{ + .target = target, + .optimize = optimize, + }); + exe.root_module.addImport("zio", zio.module("zio")); exe.root_module.addImport("dusty", dusty.module("dusty")); exe.root_module.addImport("zware", zware.module("zware")); - + exe.root_module.addImport("nats", nats.module("nats")); + exe.root_module.addImport("toml", toml.module("toml")); b.installArtifact(exe); const run_step = b.step("run", "Run the app"); diff --git a/build.zig.zon b/build.zig.zon index 473c23e..d6ca5de 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -34,12 +34,12 @@ // internet connectivity. .dependencies = .{ .zio = .{ - .url = "git+https://github.com/lalinsky/zio?ref=v0.8.1#d091a8e4ed5849e1eb1bd4a49fd019e48f00546d", - .hash = "zio-0.8.1-xHbVVGpjGADhmfXLbPcE9zPS0rTlCM3s5Z7EPX4chiwx", + .url = "git+https://github.com/lalinsky/zio?ref=v0.8.2#3ac234eae165177cd75742f922851dc5d373fd12", + .hash = "zio-0.8.2-xHbVVC5jGAAYUMZd_BW5eT-HJb_lf-LCGj4PaLOeEkA2", }, .dusty = .{ - .url = "git+https://github.com/lalinsky/dusty#eeaa9a38464b617ee59836bec77820fcc103eb63", - .hash = "dusty-0.0.0-Qdw7RjF_CQDBCV9L-GJlIOJHa7I6rGmp1mKfqV61ocuL", + .url = "git+https://github.com/lalinsky/dusty#7f18e52557fd4748cde9e73a7c4a8a9c7d9b4bb7", + .hash = "dusty-0.0.0-Qdw7RgD0CQCYDLh3FuSf3cIMmvhqpIhDiH6MlNH1DJsA", }, .zware = .{ .url = "git+https://github.com/slunghq/zware#be0d934ed01e59fd48859b14c7074d50c0e98d22", @@ -49,6 +49,14 @@ .url = "git+https://github.com/james-elicx/codspeed-zig#00066158cab242c572368de69db9c58a548e8eb4", .hash = "codspeed-0.0.1-Km8V_DhfBwDNpIj5PHSrZqekEjeVLhrLRUUYjXNsCGXJ", }, + .nats = .{ + .url = "git+https://github.com/lalinsky/nats.zig#c7eb72dbdbf5e71c453c22b66a29356d0e6d2ad4", + .hash = "nats-0.0.0-JvIiUNy6BgBkOmLvuY1cfhjJLPg7CoFGRg6c91XteVqQ", + }, + .toml = .{ + .url = "git+https://github.com/notxorand/zig-toml#cf50bd59c6276fb2b6d34b8d71d35486eecc719c", + .hash = "toml-0.3.0-bV14Bd-EAQBKoXhpYft303BtA2vgLNlxntUCIWgRUl46", + }, }, .paths = .{ "build.zig", diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..12eab31 --- /dev/null +++ b/config.toml @@ -0,0 +1,51 @@ +name = "basic" +lang = "rust" + +[ingest] +method = ["websocket", "nats"] + +[ingest.websocket] +method = "native" +port = 2077 + +[ingest.http] +port = 2078 + +[ingest.nats] +subscriptions = ["slung"] +url = "nats://localhost:4222" + +[ingest.mqtt] +topics = ["slung"] +url = "mqtt://localhost:1883" + +[flush] +method = "native" + +[sync] +method = "s3" + +[sync.s3] +bucket = "slung" + +[sync.r2] +bucket = "slung" + +[sync.socket] +port = 2080 + +[populate] +method = ["csv", "tdms", "native"] + +[populate.csv] +path = ["/tmp/data.csv"] + +[populate.tdms] +path = ["/tmp/data.tdms"] + +[populate.native] +path = ["/tmp/_data.dat"] + +[tree] +page_size = 4096 +max_level = 100_000 diff --git a/millions.zig b/millions.zig index 0663a24..9f086f2 100644 --- a/millions.zig +++ b/millions.zig @@ -2,7 +2,8 @@ const std = @import("std"); const Codspeed = @import("codspeed"); const tsm = @import("src/tsm/tsm.zig"); -const TsmTree = tsm.TsmTreeImpl(1000, 4096, .gorilla); +const TsmTree = tsm.TsmTreeRuntime; + const metric_name = "bench.cpu.total.millions"; const hosts = [_][]const u8{ "h-0", "h-1", "h-2", "h-3", "h-4", @@ -37,7 +38,7 @@ fn runBenchmark(allocator: std.mem.Allocator) !void { var max_memory_bytes: u64 = 0; - var tree = try TsmTree.init(allocator, metric_name); + var tree = try TsmTree.init(allocator, metric_name, .Native); defer tree.deinit(); const start = std.time.nanoTimestamp(); diff --git a/src/config.zig b/src/config.zig new file mode 100644 index 0000000..46a1874 --- /dev/null +++ b/src/config.zig @@ -0,0 +1,291 @@ +const std = @import("std"); + +const toml = @import("toml"); + +pub const StreamConfig = struct { + ingest: []const Ingest = &.{.{ .WebSocket = .{ .mode = .Native, .port = 2077 } }}, + flush: Flush = .Native, + sync: Sync = .None, + populate: []const Populate = &.{.None}, + + pub const Ingest = union(enum) { + WebSocket: struct { mode: enum { Native, MsgPack }, port: u16 }, + NATS: struct { subscriptions: []const []const u8, url: []const u8 }, + HTTP: struct { port: u16 }, + MQTT: struct { topics: []const []const u8, url: []const u8 }, + }; + + pub const Flush = enum { Native, CSV }; + + pub const Sync = union(enum) { + None, + S3: []const u8, + R2: []const u8, + Socket: []const u8, + }; + + pub const Populate = union(enum) { + None, + Native: []const []const u8, + CSV: []const []const u8, + TDMS: []const []const u8, + }; +}; + +const RawConfig = struct { + name: []const u8, + lang: []const u8, + ingest: Ingest, + flush: Flush, + sync: Sync, + populate: Populate, + tree: Tree, + + const Ingest = struct { + method: []const []const u8, + websocket: ?struct { + method: []const u8, + port: u16, + } = null, + http: ?struct { port: u16 } = null, + nats: ?struct { + subscriptions: ?[]const []const u8 = null, + url: []const u8, + } = null, + mqtt: ?struct { + topics: []const []const u8, + url: []const u8, + } = null, + }; + + const Flush = struct { + method: []const u8, + }; + + const Sync = struct { + method: []const u8, + s3: ?struct { bucket: []const u8 } = null, + r2: ?struct { bucket: []const u8 } = null, + socket: ?struct { port: u16 } = null, + }; + + const Populate = struct { + method: []const []const u8, + native: ?struct { path: []const []const u8 } = null, + csv: ?struct { path: []const []const u8 } = null, + tdms: ?struct { path: []const []const u8 } = null, + }; + + const Tree = struct { + page_size: u32, + max_level: u32, + }; +}; + +pub const ParsedStreamConfig = struct { + parsed: toml.Parsed(RawConfig), + value: StreamConfig, + + pub fn deinit(self: *ParsedStreamConfig) void { + self.parsed.deinit(); + } +}; + +fn parseRawConfigFile(allocator: std.mem.Allocator, path: []const u8) !toml.Parsed(RawConfig) { + const content = try std.fs.cwd().readFileAlloc(allocator, path, 64 * 1024 * 1024); + defer allocator.free(content); + + var parser = toml.Parser(RawConfig).init(allocator); + defer parser.deinit(); + + return parser.parseString(content); +} + +pub fn parseFromConfigFile(allocator: std.mem.Allocator, path: []const u8) !ParsedStreamConfig { + var parsed = try parseRawConfigFile(allocator, path); + errdefer parsed.deinit(); + + const cfg = parsed.value; + var stream = StreamConfig{}; + + if (cfg.ingest.method.len == 0) return error.MissingIngestMethod; + const ingest_list = try parsed.arena.allocator().alloc(StreamConfig.Ingest, cfg.ingest.method.len); + for (cfg.ingest.method, 0..) |method, i| { + if (std.ascii.eqlIgnoreCase(method, "websocket")) { + const websocket = cfg.ingest.websocket orelse return error.MissingIngestWebSocketConfig; + if (std.ascii.eqlIgnoreCase(websocket.method, "msgpack")) { + ingest_list[i] = .{ .WebSocket = .{ .mode = .MsgPack, .port = websocket.port } }; + } else if (std.ascii.eqlIgnoreCase(websocket.method, "native")) { + ingest_list[i] = .{ .WebSocket = .{ .mode = .Native, .port = websocket.port } }; + } else { + return error.InvalidIngestMethod; + } + } else if (std.ascii.eqlIgnoreCase(method, "nats")) { + const nats = cfg.ingest.nats orelse return error.MissingIngestNatsConfig; + const subscriptions = nats.subscriptions orelse return error.MissingNatsSubscriptions; + if (subscriptions.len == 0) return error.MissingNatsSubscriptions; + ingest_list[i] = .{ .NATS = .{ .subscriptions = subscriptions, .url = nats.url } }; + } else if (std.ascii.eqlIgnoreCase(method, "http")) { + const http = cfg.ingest.http orelse return error.MissingIngestHttpConfig; + ingest_list[i] = .{ .HTTP = .{ .port = http.port } }; + } else if (std.ascii.eqlIgnoreCase(method, "mqtt")) { + const mqtt = cfg.ingest.mqtt orelse return error.MissingIngestMqttConfig; + ingest_list[i] = .{ .MQTT = .{ .topics = mqtt.topics, .url = mqtt.url } }; + } else { + return error.InvalidIngestMethod; + } + } + stream.ingest = ingest_list; + + if (std.ascii.eqlIgnoreCase(cfg.flush.method, "native")) { + stream.flush = .Native; + } else if (std.ascii.eqlIgnoreCase(cfg.flush.method, "csv")) { + stream.flush = .CSV; + } else { + return error.InvalidFlushMethod; + } + + if (std.ascii.eqlIgnoreCase(cfg.sync.method, "none")) { + stream.sync = .None; + } else if (std.ascii.eqlIgnoreCase(cfg.sync.method, "s3")) { + const s3 = cfg.sync.s3 orelse return error.MissingS3Config; + stream.sync = .{ .S3 = s3.bucket }; + } else if (std.ascii.eqlIgnoreCase(cfg.sync.method, "r2")) { + const r2 = cfg.sync.r2 orelse return error.MissingR2Config; + stream.sync = .{ .R2 = r2.bucket }; + } else if (std.ascii.eqlIgnoreCase(cfg.sync.method, "socket")) { + const socket = cfg.sync.socket orelse return error.MissingSocketConfig; + const endpoint = try std.fmt.allocPrint(parsed.arena.allocator(), "0.0.0.0:{d}", .{socket.port}); + stream.sync = .{ .Socket = endpoint }; + } else { + return error.InvalidSyncMethod; + } + + if (cfg.populate.method.len == 0) return error.MissingPopulateMethod; + const populate_list = try parsed.arena.allocator().alloc(StreamConfig.Populate, cfg.populate.method.len); + for (cfg.populate.method, 0..) |method, i| { + if (std.ascii.eqlIgnoreCase(method, "none")) { + populate_list[i] = .None; + } else if (std.ascii.eqlIgnoreCase(method, "native")) { + const native_cfg = cfg.populate.native orelse return error.MissingPopulateCsvConfig; + populate_list[i] = .{ .Native = native_cfg.path }; + } else if (std.ascii.eqlIgnoreCase(method, "csv")) { + const csv_cfg = cfg.populate.csv orelse return error.MissingPopulateCsvConfig; + populate_list[i] = .{ .CSV = csv_cfg.path }; + } else if (std.ascii.eqlIgnoreCase(method, "tdms")) { + const tdms_cfg = cfg.populate.tdms orelse return error.MissingPopulateTdmsConfig; + populate_list[i] = .{ .TDMS = tdms_cfg.path }; + } else { + return error.InvalidPopulateMethod; + } + } + stream.populate = populate_list; + + return .{ + .parsed = parsed, + .value = stream, + }; +} + +test "parse mapped stream config from config.toml" { + var parsed = try parseFromConfigFile(std.testing.allocator, "config.toml"); + defer parsed.deinit(); + + const stream = parsed.value; + + try std.testing.expectEqual(@as(usize, 2), stream.ingest.len); + + switch (stream.ingest[0]) { + .WebSocket => |ws| { + try std.testing.expectEqual(.Native, ws.mode); + try std.testing.expectEqual(@as(u16, 2077), ws.port); + }, + else => return error.TestExpectedWebSocketIngest, + } + + switch (stream.ingest[1]) { + .NATS => |nats| { + try std.testing.expect(std.mem.eql(u8, nats.url, "nats://localhost:4222")); + try std.testing.expectEqual(@as(usize, 1), nats.subscriptions.len); + try std.testing.expect(std.mem.eql(u8, nats.subscriptions[0], "slung")); + }, + else => return error.TestExpectedNatsIngest, + } + + try std.testing.expectEqual(.Native, stream.flush); + + switch (stream.sync) { + .S3 => |bucket| try std.testing.expect(std.mem.eql(u8, bucket, "slung")), + else => return error.TestExpectedS3Sync, + } + + try std.testing.expectEqual(@as(usize, 3), stream.populate.len); + switch (stream.populate[0]) { + .CSV => |csv_path| try std.testing.expect(std.mem.eql(u8, csv_path[0], "/tmp/data.csv")), + else => return error.TestExpectedCsvPopulate, + } + switch (stream.populate[1]) { + .TDMS => |tdms_path| try std.testing.expect(std.mem.eql(u8, tdms_path[0], "/tmp/data.tdms")), + else => return error.TestExpectedTdmsPopulate, + } + switch (stream.populate[2]) { + .Native => |native_path| try std.testing.expect(std.mem.eql(u8, native_path[0], "/tmp/_data.dat")), + else => return error.TestExpectedNativePopulate, + } +} + +test "parse config.toml cases" { + var parsed = try parseRawConfigFile(std.testing.allocator, "config.toml"); + defer parsed.deinit(); + + const cfg = parsed.value; + + try std.testing.expect(std.mem.eql(u8, cfg.name, "basic")); + try std.testing.expect(std.mem.eql(u8, cfg.lang, "rust")); + + try std.testing.expectEqual(@as(usize, 2), cfg.ingest.method.len); + try std.testing.expect(std.mem.eql(u8, cfg.ingest.method[0], "websocket")); + try std.testing.expect(std.mem.eql(u8, cfg.ingest.method[1], "nats")); + + const websocket = cfg.ingest.websocket orelse return error.TestMissingWebSocketTable; + try std.testing.expect(std.mem.eql(u8, websocket.method, "native")); + try std.testing.expectEqual(@as(u16, 2077), websocket.port); + + const http = cfg.ingest.http orelse return error.TestMissingHttpTable; + try std.testing.expectEqual(@as(u16, 2078), http.port); + + const nats = cfg.ingest.nats orelse return error.TestMissingNatsTable; + const nats_subs = nats.subscriptions orelse return error.TestMissingNatsSubscriptions; + try std.testing.expectEqual(@as(usize, 1), nats_subs.len); + try std.testing.expect(std.mem.eql(u8, nats_subs[0], "slung")); + try std.testing.expect(std.mem.eql(u8, nats.url, "nats://localhost:4222")); + + const mqtt = cfg.ingest.mqtt orelse return error.TestMissingMqttTable; + try std.testing.expectEqual(@as(usize, 1), mqtt.topics.len); + try std.testing.expect(std.mem.eql(u8, mqtt.topics[0], "slung")); + try std.testing.expect(std.mem.eql(u8, mqtt.url, "mqtt://localhost:1883")); + + try std.testing.expect(std.mem.eql(u8, cfg.flush.method, "native")); + try std.testing.expect(std.mem.eql(u8, cfg.sync.method, "s3")); + + const s3 = cfg.sync.s3 orelse return error.TestMissingS3; + const r2 = cfg.sync.r2 orelse return error.TestMissingR2; + const socket = cfg.sync.socket orelse return error.TestMissingSocket; + try std.testing.expect(std.mem.eql(u8, s3.bucket, "slung")); + try std.testing.expect(std.mem.eql(u8, r2.bucket, "slung")); + try std.testing.expectEqual(@as(u16, 2080), socket.port); + + try std.testing.expectEqual(@as(usize, 3), cfg.populate.method.len); + try std.testing.expect(std.mem.eql(u8, cfg.populate.method[0], "csv")); + try std.testing.expect(std.mem.eql(u8, cfg.populate.method[1], "tdms")); + + const csv_cfg = cfg.populate.csv orelse return error.TestMissingPopulateCsv; + const tdms_cfg = cfg.populate.tdms orelse return error.TestMissingPopulateTdms; + const native_cfg = cfg.populate.native orelse return error.TestMissingPopulateTdms; + try std.testing.expect(std.mem.eql(u8, csv_cfg.path[0], "/tmp/data.csv")); + try std.testing.expect(std.mem.eql(u8, tdms_cfg.path[0], "/tmp/data.tdms")); + try std.testing.expect(std.mem.eql(u8, native_cfg.path[0], "/tmp/_data.dat")); + + try std.testing.expectEqual(@as(u32, 4096), cfg.tree.page_size); + try std.testing.expectEqual(@as(u32, 100_000), cfg.tree.max_level); +} diff --git a/src/csv.zig b/src/csv.zig new file mode 100644 index 0000000..47ff833 --- /dev/null +++ b/src/csv.zig @@ -0,0 +1,340 @@ +//! RFC 4180 compliant CSV reader and writer +//! +//! Credit: https://github.com/melihbirim/csvq + +const std = @import("std"); +const Allocator = std.mem.Allocator; + +/// RFC 4180 compliant CSV reader +pub const CsvReader = struct { + file: std.fs.File, + allocator: Allocator, + delimiter: u8, + buffer: [262144]u8, // Increased to 256KB for fewer syscalls + buffer_pos: usize, + buffer_len: usize, + eof: bool, + putback_byte: ?u8, + + pub fn init(allocator: Allocator, file: std.fs.File) CsvReader { + return CsvReader{ + .file = file, + .allocator = allocator, + .delimiter = ',', + .buffer = undefined, + .buffer_pos = 0, + .buffer_len = 0, + .eof = false, + .putback_byte = null, + }; + } + + fn readByte(self: *CsvReader) !?u8 { + // Check if there's a putback byte first + if (self.putback_byte) |byte| { + self.putback_byte = null; + return byte; + } + + if (self.buffer_pos >= self.buffer_len) { + if (self.eof) return null; + self.buffer_len = try self.file.read(&self.buffer); + self.buffer_pos = 0; + if (self.buffer_len == 0) { + self.eof = true; + return null; + } + } + const byte = self.buffer[self.buffer_pos]; + self.buffer_pos += 1; + return byte; + } + + fn putBackByte(self: *CsvReader, byte: u8) void { + self.putback_byte = byte; + } + + /// Read the next CSV record + pub fn readRecord(self: *CsvReader) !?[][]u8 { + var fields = std.ArrayList([]u8){}; + errdefer { + for (fields.items) |field| { + self.allocator.free(field); + } + fields.deinit(self.allocator); + } + + var field_buffer = std.ArrayList(u8){}; + defer field_buffer.deinit(self.allocator); + + var in_quotes = false; + var at_start = true; + + while (true) { + const byte_opt = try self.readByte(); + const byte = byte_opt orelse { + // EOF - handle last field + if (field_buffer.items.len > 0 or fields.items.len > 0 or !at_start) { + try fields.append(self.allocator, try field_buffer.toOwnedSlice(self.allocator)); + } + if (fields.items.len == 0) { + return null; + } + return try fields.toOwnedSlice(self.allocator); + }; + + at_start = false; + + if (in_quotes) { + if (byte == '"') { + // Check for escaped quote + const next_opt = try self.readByte(); + if (next_opt) |next| { + if (next == '"') { + // Escaped quote + try field_buffer.append(self.allocator, '"'); + } else { + // End of quoted field + in_quotes = false; + // Put back the byte + self.putBackByte(next); + } + } else { + // EOF after quote + in_quotes = false; + } + } else { + try field_buffer.append(self.allocator, byte); + } + } else { + if (byte == '"' and field_buffer.items.len == 0) { + // Start of quoted field + in_quotes = true; + } else if (byte == self.delimiter) { + // End of field + try fields.append(self.allocator, try field_buffer.toOwnedSlice(self.allocator)); + field_buffer = std.ArrayList(u8){}; + } else if (byte == '\r') { + // Handle CR - check for LF + const next_opt = try self.readByte(); + if (next_opt) |next| { + if (next != '\n') { + self.putBackByte(next); + } + } + // End of record + try fields.append(self.allocator, try field_buffer.toOwnedSlice(self.allocator)); + return try fields.toOwnedSlice(self.allocator); + } else if (byte == '\n') { + // End of record + try fields.append(self.allocator, try field_buffer.toOwnedSlice(self.allocator)); + return try fields.toOwnedSlice(self.allocator); + } else { + try field_buffer.append(self.allocator, byte); + } + } + } + } + + /// Free a record returned by readRecord + pub fn freeRecord(self: *CsvReader, record: [][]u8) void { + for (record) |field| { + self.allocator.free(field); + } + self.allocator.free(record); + } +}; + +/// Fast CSV reader for simple cases (no quotes) +pub const FastCsvReader = struct { + file: std.fs.File, + allocator: Allocator, + delimiter: u8, + line_buffer: std.ArrayList(u8), + buffer: [262144]u8, // Increased to 256KB to match CsvReader + buffer_pos: usize, + buffer_len: usize, + eof: bool, + + pub fn init(allocator: Allocator, file: std.fs.File) FastCsvReader { + return FastCsvReader{ + .file = file, + .allocator = allocator, + .delimiter = ',', + .line_buffer = std.ArrayList(u8){}, + .buffer = undefined, + .buffer_pos = 0, + .buffer_len = 0, + .eof = false, + }; + } + + fn readByte(self: *FastCsvReader) !?u8 { + if (self.buffer_pos >= self.buffer_len) { + if (self.eof) return null; + self.buffer_len = try self.file.read(&self.buffer); + self.buffer_pos = 0; + if (self.buffer_len == 0) { + self.eof = true; + return null; + } + } + const byte = self.buffer[self.buffer_pos]; + self.buffer_pos += 1; + return byte; + } + + pub fn deinit(self: *FastCsvReader) void { + self.line_buffer.deinit(self.allocator); + } + + /// Read the next CSV record (fast path - assumes no escaped quotes) + pub fn readRecord(self: *FastCsvReader) !?[][]u8 { + self.line_buffer.clearRetainingCapacity(); + + // Read line byte by byte until \n + while (try self.readByte()) |byte| { + if (byte == '\n') break; + try self.line_buffer.append(self.allocator, byte); + } + + if (self.line_buffer.items.len == 0 and self.eof) { + return null; + } + + // Trim trailing \r if present + if (self.line_buffer.items.len > 0 and self.line_buffer.items[self.line_buffer.items.len - 1] == '\r') { + _ = self.line_buffer.pop(); + } + + // Split by delimiter + var fields = std.ArrayList([]u8){}; + errdefer { + for (fields.items) |field| { + self.allocator.free(field); + } + fields.deinit(self.allocator); + } + + var iter = std.mem.splitScalar(u8, self.line_buffer.items, self.delimiter); + while (iter.next()) |field| { + try fields.append(self.allocator, try self.allocator.dupe(u8, field)); + } + + return try fields.toOwnedSlice(self.allocator); + } + + /// Free a record returned by readRecord + pub fn freeRecord(self: *FastCsvReader, record: [][]u8) void { + for (record) |field| { + self.allocator.free(field); + } + self.allocator.free(record); + } +}; + +/// CSV writer with buffering +pub const CsvWriter = struct { + file: std.fs.File, + delimiter: u8, + buffer: [1048576]u8, // 1MB buffer for fewer write syscalls + buffer_pos: usize, + + pub fn init(file: std.fs.File) CsvWriter { + return CsvWriter{ + .file = file, + .delimiter = ',', + .buffer = undefined, + .buffer_pos = 0, + }; + } + + pub fn writeToBuffer(self: *CsvWriter, data: []const u8) !void { + var remaining = data; + while (remaining.len > 0) { + const space_left = self.buffer.len - self.buffer_pos; + if (space_left == 0) { + try self.flush(); + continue; + } + + const to_copy = @min(remaining.len, space_left); + @memcpy(self.buffer[self.buffer_pos..][0..to_copy], remaining[0..to_copy]); + self.buffer_pos += to_copy; + remaining = remaining[to_copy..]; + } + } + + pub fn writeRecord(self: *CsvWriter, fields: []const []const u8) !void { + for (fields, 0..) |field, i| { + if (i > 0) { + try self.writeToBuffer(&[_]u8{self.delimiter}); + } + + // Check if field needs quoting + const needs_quotes = std.mem.indexOfAny(u8, field, ",\"\r\n") != null; + if (needs_quotes) { + try self.writeToBuffer("\""); + // Escape quotes + for (field) |c| { + if (c == '"') { + try self.writeToBuffer("\"\""); + } else { + try self.writeToBuffer(&[_]u8{c}); + } + } + try self.writeToBuffer("\""); + } else { + try self.writeToBuffer(field); + } + } + try self.writeToBuffer("\n"); + } + + pub fn flush(self: *CsvWriter) !void { + if (self.buffer_pos > 0) { + try self.file.writeAll(self.buffer[0..self.buffer_pos]); + self.buffer_pos = 0; + } + } +}; + +test "csv reader simple" { + const allocator = std.testing.allocator; + + // Create a temporary file + const file_path = "test_csv_simple.csv"; + var file = try std.fs.cwd().createFile(file_path, .{ .read = true }); + defer { + file.close(); + std.fs.cwd().deleteFile(file_path) catch {}; + } + + // Write CSV data + try file.writeAll("name,age\nAlice,30\nBob,25\n"); + try file.seekTo(0); + + // Read CSV + var reader = CsvReader.init(allocator, file); + + // Read header + const header = (try reader.readRecord()).?; + defer reader.freeRecord(header); + try std.testing.expectEqualStrings("name", header[0]); + try std.testing.expectEqualStrings("age", header[1]); + + // Read first row + const row1 = (try reader.readRecord()).?; + defer reader.freeRecord(row1); + try std.testing.expectEqualStrings("Alice", row1[0]); + try std.testing.expectEqualStrings("30", row1[1]); + + // Read second row + const row2 = (try reader.readRecord()).?; + defer reader.freeRecord(row2); + try std.testing.expectEqualStrings("Bob", row2[0]); + try std.testing.expectEqualStrings("25", row2[1]); + + // No more rows + try std.testing.expect(try reader.readRecord() == null); +} diff --git a/src/ds/bloom.zig b/src/ds/bloom.zig index bdda97b..b020407 100644 --- a/src/ds/bloom.zig +++ b/src/ds/bloom.zig @@ -9,9 +9,10 @@ //! There's plans for a counting bloom filter and eventually a cuckoo filter. const std = @import("std"); -const bitmap = @import("bitmap.zig"); const testing = std.testing; +const bitmap = @import("bitmap.zig"); + fn BloomImpl(comptime size: usize, comptime HashFn: type) type { return struct { const Self = @This(); diff --git a/src/host/execute.zig b/src/host/execute.zig index a58cf94..d4f9451 100644 --- a/src/host/execute.zig +++ b/src/host/execute.zig @@ -1,10 +1,12 @@ const std = @import("std"); +const Allocator = std.mem.Allocator; + const zware = @import("zware"); -const host = @import("host.zig"); const Store = zware.Store; const Module = zware.Module; const Instance = zware.Instance; -const Allocator = std.mem.Allocator; + +const host = @import("host.zig"); pub fn spawn(allocator: Allocator, bytes: []const u8, context_ptr: usize) !void { var store = Store.init(allocator); diff --git a/src/host/host.zig b/src/host/host.zig index cb2246c..a4a718f 100644 --- a/src/host/host.zig +++ b/src/host/host.zig @@ -1,7 +1,9 @@ const std = @import("std"); + const http = @import("dusty"); const zio = @import("zio"); const zware = @import("zware"); + const AppContext = @import("../main.zig").AppContext; const Query = @import("../query.zig").Query; const QueryOp = @import("../tsm/tsm.zig").TsmTree.QueryOp; diff --git a/src/main.zig b/src/main.zig index e2ac928..22109ca 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,19 +1,25 @@ const std = @import("std"); -const builtin = @import("builtin"); -const zio = @import("zio"); -const http = @import("dusty"); const testing = std.testing; -const ds = @import("ds/ds.zig"); -const tsm = @import("tsm/tsm.zig"); -const query = @import("query.zig"); const net = std.net; -const execute = @import("host/execute.zig"); const ArrayList = std.ArrayList; const AutoHashMap = std.AutoHashMap; +const builtin = @import("builtin"); + +const http = @import("dusty"); +const zio = @import("zio"); const Channel = zio.Channel; -const Query = query.Query; -const TsmTree = tsm.TsmTree; const Notify = zio.Notify; + +const config = @import("config.zig"); +pub const StreamConfig = config.StreamConfig; +const csv = @import("csv.zig"); +const ds = @import("ds/ds.zig"); +const execute = @import("host/execute.zig"); +const query = @import("query.zig"); +const Query = query.Query; +const tsm = @import("tsm/tsm.zig"); +const TsmTree = tsm.TsmTreeRuntime; + const CHANNEL_CAPACITY = 8192 * 2; pub const AppContext = struct { @@ -46,6 +52,7 @@ const Server = struct { next_query_id: std.atomic.Value(u32), tree: *TsmTree, notify: *Notify, + stream_config: *StreamConfig, const ChannelData = struct { /// to be used as id if not streamed with data @@ -59,6 +66,7 @@ const Server = struct { channel: *Channel(ChannelData), tree: *TsmTree, notify: *Notify, + stream_config: *StreamConfig, ) !Server { return Server{ .allocator = context.io.allocator, @@ -76,6 +84,7 @@ const Server = struct { .next_query_id = std.atomic.Value(u32).init(1), .tree = tree, .notify = notify, + .stream_config = stream_config, }; } @@ -400,10 +409,9 @@ pub fn handleWasm(allocator: std.mem.Allocator, context: *AppContext) !void { while (args.next()) |arg| { if (std.mem.eql(u8, arg, "--wasm")) { - wasm_path = args.next() orelse return error.InvalidArguments; + wasm_path = args.next() orelse return error.InvalidWasmPath; continue; } - return error.InvalidArguments; } const path = wasm_path orelse @panic("Set the path to the Wasm file with --wasm "); const bytes = try std.fs.cwd().readFileAlloc(allocator, path, 64 * 1024 * 1024); @@ -588,6 +596,25 @@ fn handleWasmWebsocket(context: *AppContext) !void { } } +fn loadConfig(allocator: std.mem.Allocator, stream_config: *StreamConfig) !void { + var args = try std.process.argsWithAllocator(allocator); + defer args.deinit(); + + _ = args.next(); + var config_path: []const u8 = "./config.toml"; + + while (args.next()) |arg| { + if (std.mem.eql(u8, arg, "--config")) { + config_path = args.next() orelse return error.InvalidConfigPath; + continue; + } + } + + const parsed_config = config.parseFromConfigFile(allocator, config_path) catch return; + + stream_config.* = (parsed_config).value; +} + pub fn main() !void { var gpa = std.heap.DebugAllocator(.{}).init; var buffer: [2 * 1024 * 1024 * 1024]u8 = undefined; @@ -612,12 +639,21 @@ pub fn main() !void { defer allocator.free(channel_buffer); var channel = Channel(Server.ChannelData).init(channel_buffer[0..]); - var tree = try TsmTree.init(allocator, "demo"); + // TODO: parse from slung.toml or cli argument + var stream_config = StreamConfig{}; + try loadConfig(allocator, &stream_config); + + const backend: tsm.TsmTreeRuntime.EntryBackend = switch (stream_config.flush) { + .Native => .Native, + .CSV => .CSV, + }; + + var tree = try TsmTree.init(allocator, "demo", backend); defer tree.deinit(); var notify = Notify.init; - var server = try Server.init(&context, &channel, &tree, ¬ify); + var server = try Server.init(&context, &channel, &tree, ¬ify, &stream_config); context.server = &server; defer server.deinit(); @@ -629,6 +665,8 @@ pub fn main() !void { } test { + _ = csv; + _ = config; _ = ds; _ = tsm; _ = query; diff --git a/src/query.zig b/src/query.zig index b8b5236..29c3555 100644 --- a/src/query.zig +++ b/src/query.zig @@ -24,6 +24,7 @@ //! SUM:series6:[tag9] const std = @import("std"); + const PollState = @import("host/host.zig").PollState; pub const Query = struct { diff --git a/src/tsm/cache.zig b/src/tsm/cache.zig index 818b0d4..2fdbccc 100644 --- a/src/tsm/cache.zig +++ b/src/tsm/cache.zig @@ -1,16 +1,17 @@ const std = @import("std"); const testing = std.testing; -const ds = @import("../ds/ds.zig"); const Allocator = std.mem.Allocator; const HashMap = std.StringArrayHashMap; + +const ds = @import("../ds/ds.zig"); const Skiplist = ds.skiplist.SkipList; const Bloom = ds.bloom.Bloom; const Hasher = ds.bloom.DefaultHashFn; const entry_mod = @import("entry.zig"); -const DiskEntry = entry_mod.DiskEntry; -const TimestampEncoding = entry_mod.TimestampEncoding; +const types = @import("types.zig"); +const TimestampEncoding = types.TimestampEncoding; -fn CacheImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncoding) type { +pub fn CacheImplWithEntry(comptime page_size: u32, comptime ts_encoding: TimestampEncoding, comptime DiskEntry: fn (comptime u32, comptime TimestampEncoding) type) type { return struct { const Self = @This(); @@ -19,26 +20,8 @@ fn CacheImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncoding) t bloom: Bloom(1024, Hasher), count: u64 = 0, - pub const DataPoint = struct { - timestamp: i64, - value: Value, - }; - - pub const Value = union(enum) { - Bool: bool, - Int: i64, - Float: f64, - Bytes: []const u8, - - pub fn compare(self: Value, b: Value) std.math.Order { - return switch (self) { - .Int => |val| std.math.order(val, b.Int), - .Float => |val| std.math.order(val, b.Float), - .Bytes => |val| std.mem.order(u8, val, b.Bytes), - .Bool => |val| std.math.order(@intFromBool(val), @intFromBool(b.Bool)), - }; - } - }; + pub const DataPoint = types.DataPoint; + pub const Value = types.Value; pub fn init(allocator: Allocator) Self { return Self{ @@ -108,6 +91,10 @@ fn CacheImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncoding) t }; } +pub fn CacheImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncoding) type { + return CacheImplWithEntry(page_size, ts_encoding, entry_mod.DiskEntry); +} + pub const Cache = CacheImpl; test "Cache" { diff --git a/src/tsm/entry.zig b/src/tsm/entry.zig index 0d6cfe6..9daa921 100644 --- a/src/tsm/entry.zig +++ b/src/tsm/entry.zig @@ -1,21 +1,15 @@ const std = @import("std"); const fs = std.fs; -const ds = @import("../ds/ds.zig"); const Allocator = std.mem.Allocator; const HashMap = std.StringHashMap; const AutoHashMap = std.AutoHashMap; -const cache_mod = @import("cache.zig"); -const Cache = cache_mod.Cache; + +const ds = @import("../ds/ds.zig"); const Bloom = ds.bloom.Bloom; const Hasher = ds.bloom.DefaultHashFn; const gorilla = @import("gorilla.zig"); - -pub const TimestampEncoding = enum { - /// Zigzag varint delta encoding - faster queries - delta, - /// Delta-of-delta bit encoding - better compression - gorilla, -}; +const types = @import("types.zig"); +pub const TimestampEncoding = types.TimestampEncoding; fn DiskEntryImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncoding) type { return struct { @@ -74,9 +68,9 @@ fn DiskEntryImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncodin len: u64, }; - const Value = Cache(page_size, ts_encoding).Value; + const Value = types.Value; - pub fn flush(allocator: Allocator, cache: *Cache(page_size, ts_encoding), file_path: []const u8, level: usize) !*Self { + pub fn flush(allocator: Allocator, cache: anytype, file_path: []const u8, level: usize) !*Self { var entry = try allocator.create(Self); entry.allocator = allocator; entry.file_path = try allocator.dupe(u8, file_path); diff --git a/src/tsm/entry_csv.zig b/src/tsm/entry_csv.zig new file mode 100644 index 0000000..70d1ac5 --- /dev/null +++ b/src/tsm/entry_csv.zig @@ -0,0 +1,453 @@ +const std = @import("std"); +const fs = std.fs; +const Allocator = std.mem.Allocator; +const HashMap = std.StringHashMap; +const AutoHashMap = std.AutoHashMap; + +const csv = @import("../csv.zig"); +const ds = @import("../ds/ds.zig"); +const Bloom = ds.bloom.Bloom; +const Hasher = ds.bloom.DefaultHashFn; +const types = @import("types.zig"); +pub const TimestampEncoding = types.TimestampEncoding; + +pub fn DiskEntryImpl(comptime page_size: u32, comptime ts_encoding: TimestampEncoding) type { + return struct { + const Self = @This(); + + pub const timestamp_encoding = ts_encoding; + + allocator: Allocator, + file_path: []const u8, + file_data: fs.File, + file_index: fs.File, + + index_row: HashMap(u64), + index_column: HashMap(u64), + index_series: HashMap([2]u64), + bloom: Bloom(1024, Hasher), + + metadata: Metadata, + column_descriptors: []ColumnDescriptor, + + cached_row_offsets: []?AutoHashMap(u64, RowOffset), + cached_page_descriptors: []?[]PageDescriptor, + + const Value = types.Value; + + pub const Metadata = struct { + number_rows: u64, + number_columns: u32, + created_at: i64, + page_size: u32, + version: u32, + min_timestamp: i64, + max_timestamp: i64, + }; + + pub const ColumnDescriptor = struct { + name: []const u8, + data_offset: u64, + data_size: u64, + num_pages: u32, + offset_index_page: u64, + offset_offsets_row: u64, + }; + + pub const PageDescriptor = struct { + data_offset: u64, + data_size: u32, + start_row: u64, + end_row: u64, + }; + + pub const RowOffset = struct { + row_id: u64, + page_id: u64, + offset: u64, + len: u64, + }; + + pub fn flush(allocator: Allocator, cache: anytype, file_path: []const u8, level: usize) !*Self { + var entry = try allocator.create(Self); + errdefer allocator.destroy(entry); + + entry.allocator = allocator; + entry.file_path = try allocator.dupe(u8, file_path); + errdefer allocator.free(entry.file_path); + + const path_data = try std.fmt.allocPrint(allocator, "_{d}_{s}.csv", .{ level, file_path }); + defer allocator.free(path_data); + entry.file_data = try fs.cwd().createFile(path_data, .{ .read = true }); + errdefer entry.file_data.close(); + + const path_index = try std.fmt.allocPrint(allocator, "_{d}_{s}.idx.csv", .{ level, file_path }); + defer allocator.free(path_index); + entry.file_index = try fs.cwd().createFile(path_index, .{ .read = true }); + errdefer entry.file_index.close(); + + entry.index_row = HashMap(u64).init(allocator); + entry.index_column = HashMap(u64).init(allocator); + entry.index_series = HashMap([2]u64).init(allocator); + entry.bloom = Bloom(1024, Hasher).init(); + + entry.column_descriptors = try allocator.alloc(ColumnDescriptor, 2); + errdefer allocator.free(entry.column_descriptors); + entry.column_descriptors[0] = .{ .name = try allocator.dupe(u8, "time"), .data_offset = 0, .data_size = 0, .num_pages = 0, .offset_index_page = 0, .offset_offsets_row = 0 }; + entry.column_descriptors[1] = .{ .name = try allocator.dupe(u8, "value"), .data_offset = 0, .data_size = 0, .num_pages = 0, .offset_index_page = 0, .offset_offsets_row = 0 }; + errdefer { + allocator.free(entry.column_descriptors[0].name); + allocator.free(entry.column_descriptors[1].name); + } + + try entry.index_column.put(try allocator.dupe(u8, "time"), 0); + try entry.index_column.put(try allocator.dupe(u8, "value"), 1); + + entry.cached_row_offsets = try allocator.alloc(?AutoHashMap(u64, RowOffset), 2); + @memset(entry.cached_row_offsets, null); + entry.cached_page_descriptors = try allocator.alloc(?[]PageDescriptor, 2); + @memset(entry.cached_page_descriptors, null); + + var total_rows: u64 = 0; + var min_ts: i64 = std.math.maxInt(i64); + var max_ts: i64 = std.math.minInt(i64); + + var data_writer = csv.CsvWriter.init(entry.file_data); + try data_writer.writeRecord(&[_][]const u8{ "row_id", "series_key", "timestamp", "value_type", "value_data" }); + + var series_iter = cache.index_series.iterator(); + while (series_iter.next()) |series| { + const series_start = total_rows; + var node_iter: ?*@TypeOf(series.value_ptr.*).Node = series.value_ptr.head.next(); + while (node_iter) |node| : (node_iter = node.next()) { + const row_id_s = try std.fmt.allocPrint(allocator, "{d}", .{total_rows}); + defer allocator.free(row_id_s); + + const ts_s = try std.fmt.allocPrint(allocator, "{d}", .{node.key}); + defer allocator.free(ts_s); + + const value_type_s = valueTypeTag(node.value); + const value_data_s = try valueDataToString(allocator, node.value); + defer allocator.free(value_data_s); + + try data_writer.writeRecord(&[_][]const u8{ row_id_s, series.key_ptr.*, ts_s, value_type_s, value_data_s }); + + if (node.key < min_ts) min_ts = node.key; + if (node.key > max_ts) max_ts = node.key; + total_rows += 1; + } + + if (total_rows > series_start) { + const owned_key = try allocator.dupe(u8, series.key_ptr.*); + try entry.index_series.put(owned_key, .{ series_start, total_rows - 1 }); + entry.bloom.insert(series.key_ptr.*); + } + } + try data_writer.flush(); + + if (total_rows == 0) { + min_ts = 0; + max_ts = 0; + } + + entry.metadata = .{ + .number_rows = total_rows, + .number_columns = 2, + .created_at = std.time.timestamp(), + .page_size = page_size, + .version = 1, + .min_timestamp = min_ts, + .max_timestamp = max_ts, + }; + + var index_writer = csv.CsvWriter.init(entry.file_index); + try index_writer.writeRecord(&[_][]const u8{ "kind", "k1", "k2", "k3" }); + + try writeMetaRecord(allocator, &index_writer, "number_rows", entry.metadata.number_rows); + try writeMetaRecord(allocator, &index_writer, "number_columns", entry.metadata.number_columns); + try writeMetaRecord(allocator, &index_writer, "created_at", entry.metadata.created_at); + try writeMetaRecord(allocator, &index_writer, "page_size", entry.metadata.page_size); + try writeMetaRecord(allocator, &index_writer, "version", entry.metadata.version); + try writeMetaRecord(allocator, &index_writer, "min_timestamp", entry.metadata.min_timestamp); + try writeMetaRecord(allocator, &index_writer, "max_timestamp", entry.metadata.max_timestamp); + + try index_writer.writeRecord(&[_][]const u8{ "column", "0", "time", "" }); + try index_writer.writeRecord(&[_][]const u8{ "column", "1", "value", "" }); + + var idx_series_iter = entry.index_series.iterator(); + while (idx_series_iter.next()) |kv| { + const start_s = try std.fmt.allocPrint(allocator, "{d}", .{kv.value_ptr.*[0]}); + defer allocator.free(start_s); + const end_s = try std.fmt.allocPrint(allocator, "{d}", .{kv.value_ptr.*[1]}); + defer allocator.free(end_s); + try index_writer.writeRecord(&[_][]const u8{ "series", kv.key_ptr.*, start_s, end_s }); + } + + try index_writer.flush(); + + entry.file_data.close(); + entry.file_index.close(); + entry.file_data = try fs.cwd().openFile(path_data, .{}); + entry.file_index = try fs.cwd().openFile(path_index, .{}); + + return entry; + } + + pub fn open(allocator: Allocator, file_path: []const u8, level: usize) !*Self { + var entry = try allocator.create(Self); + errdefer allocator.destroy(entry); + + entry.allocator = allocator; + entry.file_path = try allocator.dupe(u8, file_path); + errdefer allocator.free(entry.file_path); + + const path_data = try std.fmt.allocPrint(allocator, "_{d}_{s}.csv", .{ level, file_path }); + defer allocator.free(path_data); + entry.file_data = try fs.cwd().openFile(path_data, .{}); + errdefer entry.file_data.close(); + + const path_index = try std.fmt.allocPrint(allocator, "_{d}_{s}.idx.csv", .{ level, file_path }); + defer allocator.free(path_index); + entry.file_index = try fs.cwd().openFile(path_index, .{}); + errdefer entry.file_index.close(); + + entry.index_row = HashMap(u64).init(allocator); + entry.index_column = HashMap(u64).init(allocator); + entry.index_series = HashMap([2]u64).init(allocator); + entry.bloom = Bloom(1024, Hasher).init(); + + entry.column_descriptors = try allocator.alloc(ColumnDescriptor, 2); + errdefer allocator.free(entry.column_descriptors); + entry.column_descriptors[0] = .{ .name = try allocator.dupe(u8, "time"), .data_offset = 0, .data_size = 0, .num_pages = 0, .offset_index_page = 0, .offset_offsets_row = 0 }; + entry.column_descriptors[1] = .{ .name = try allocator.dupe(u8, "value"), .data_offset = 0, .data_size = 0, .num_pages = 0, .offset_index_page = 0, .offset_offsets_row = 0 }; + errdefer { + allocator.free(entry.column_descriptors[0].name); + allocator.free(entry.column_descriptors[1].name); + } + + try entry.index_column.put(try allocator.dupe(u8, "time"), 0); + try entry.index_column.put(try allocator.dupe(u8, "value"), 1); + + entry.cached_row_offsets = try allocator.alloc(?AutoHashMap(u64, RowOffset), 2); + @memset(entry.cached_row_offsets, null); + entry.cached_page_descriptors = try allocator.alloc(?[]PageDescriptor, 2); + @memset(entry.cached_page_descriptors, null); + + entry.metadata = .{ + .number_rows = 0, + .number_columns = 2, + .created_at = 0, + .page_size = page_size, + .version = 1, + .min_timestamp = 0, + .max_timestamp = 0, + }; + + try entry.loadIndexFromCsv(); + return entry; + } + + pub fn deinit(self: *Self) void { + self.file_data.close(); + self.file_index.close(); + + self.allocator.free(self.file_path); + + self.index_row.deinit(); + + var column_key_iter = self.index_column.iterator(); + while (column_key_iter.next()) |entry| { + self.allocator.free(entry.key_ptr.*); + } + self.index_column.deinit(); + + var series_key_iter = self.index_series.iterator(); + while (series_key_iter.next()) |entry| { + self.allocator.free(entry.key_ptr.*); + } + self.index_series.deinit(); + + self.bloom.deinit(); + + for (self.column_descriptors) |column_descriptor| { + self.allocator.free(column_descriptor.name); + } + self.allocator.free(self.column_descriptors); + + for (self.cached_row_offsets) |*cached| { + if (cached.*) |*map| { + map.deinit(); + } + } + self.allocator.free(self.cached_row_offsets); + + for (self.cached_page_descriptors) |cached| { + if (cached) |descs| { + self.allocator.free(descs); + } + } + self.allocator.free(self.cached_page_descriptors); + + self.allocator.destroy(self); + } + + pub fn mayContainSeries(self: *Self, series_key: []const u8) bool { + return self.bloom.contains(series_key); + } + + pub fn getColumn(self: *Self, column_name: []const u8) ![]Value { + const column_id = self.index_column.get(column_name) orelse return error.ColumnNotFound; + return self.getColumnById(column_id); + } + + pub fn getColumnRange(self: *Self, column_name: []const u8, start_row: u64, end_row: u64) ![]Value { + const column_id = self.index_column.get(column_name) orelse return error.ColumnNotFound; + return self.getColumnRangeById(@intCast(column_id), start_row, end_row); + } + + pub fn getColumnById(self: *Self, column_id: u64) ![]Value { + if (self.metadata.number_rows == 0) { + return self.allocator.alloc(Value, 0); + } + return self.getColumnRangeById(@intCast(column_id), 0, self.metadata.number_rows - 1); + } + + pub fn getColumnRangeById(self: *Self, column_id: usize, start_row: u64, end_row: u64) ![]Value { + if (column_id >= self.column_descriptors.len) return error.InvalidColumnId; + if (start_row > end_row) return error.InvalidRange; + if (self.metadata.number_rows == 0) return self.allocator.alloc(Value, 0); + if (end_row >= self.metadata.number_rows) return error.InvalidRange; + + try self.file_data.seekTo(0); + var reader = csv.CsvReader.init(self.allocator, self.file_data); + + const header = (try reader.readRecord()) orelse return error.InvalidFileFormat; + reader.freeRecord(header); + + var values: std.ArrayList(Value) = .empty; + defer values.deinit(self.allocator); + + while (try reader.readRecord()) |record| { + defer reader.freeRecord(record); + if (record.len < 5) continue; + + const row_id = std.fmt.parseInt(u64, record[0], 10) catch continue; + if (row_id < start_row or row_id > end_row) continue; + + if (column_id == 0) { + const ts = std.fmt.parseInt(i64, record[2], 10) catch continue; + try values.append(self.allocator, Value{ .Int = ts }); + } else if (column_id == 1) { + const v = try parseValueFromStrings(self.allocator, record[3], record[4]); + try values.append(self.allocator, v); + } else { + return error.InvalidColumnId; + } + } + + return values.toOwnedSlice(self.allocator); + } + + fn loadIndexFromCsv(self: *Self) !void { + try self.file_index.seekTo(0); + var reader = csv.CsvReader.init(self.allocator, self.file_index); + + const header = (try reader.readRecord()) orelse return; + reader.freeRecord(header); + + while (try reader.readRecord()) |record| { + defer reader.freeRecord(record); + if (record.len < 4) continue; + + if (std.mem.eql(u8, record[0], "meta")) { + if (std.mem.eql(u8, record[1], "number_rows")) self.metadata.number_rows = std.fmt.parseInt(u64, record[2], 10) catch self.metadata.number_rows; + if (std.mem.eql(u8, record[1], "number_columns")) self.metadata.number_columns = std.fmt.parseInt(u32, record[2], 10) catch self.metadata.number_columns; + if (std.mem.eql(u8, record[1], "created_at")) self.metadata.created_at = std.fmt.parseInt(i64, record[2], 10) catch self.metadata.created_at; + if (std.mem.eql(u8, record[1], "page_size")) self.metadata.page_size = std.fmt.parseInt(u32, record[2], 10) catch self.metadata.page_size; + if (std.mem.eql(u8, record[1], "version")) self.metadata.version = std.fmt.parseInt(u32, record[2], 10) catch self.metadata.version; + if (std.mem.eql(u8, record[1], "min_timestamp")) self.metadata.min_timestamp = std.fmt.parseInt(i64, record[2], 10) catch self.metadata.min_timestamp; + if (std.mem.eql(u8, record[1], "max_timestamp")) self.metadata.max_timestamp = std.fmt.parseInt(i64, record[2], 10) catch self.metadata.max_timestamp; + } else if (std.mem.eql(u8, record[0], "series")) { + const start_row = std.fmt.parseInt(u64, record[2], 10) catch continue; + const end_row = std.fmt.parseInt(u64, record[3], 10) catch continue; + const key = try self.allocator.dupe(u8, record[1]); + try self.index_series.put(key, .{ start_row, end_row }); + self.bloom.insert(record[1]); + } + } + } + + fn writeMetaRecord(allocator: Allocator, writer: *csv.CsvWriter, key: []const u8, value: anytype) !void { + const v = try std.fmt.allocPrint(allocator, "{d}", .{value}); + defer allocator.free(v); + try writer.writeRecord(&[_][]const u8{ "meta", key, v, "" }); + } + + fn valueTypeTag(value: Value) []const u8 { + return switch (value) { + .Bool => "bool", + .Int => "int", + .Float => "float", + .Bytes => "bytes", + }; + } + + fn valueDataToString(allocator: Allocator, value: Value) ![]u8 { + return switch (value) { + .Bool => |v| allocator.dupe(u8, if (v) "1" else "0"), + .Int => |v| std.fmt.allocPrint(allocator, "{d}", .{v}), + .Float => |v| std.fmt.allocPrint(allocator, "{d}", .{v}), + .Bytes => |v| encodeHexLower(allocator, v), + }; + } + + fn parseValueFromStrings(allocator: Allocator, type_s: []const u8, data_s: []const u8) !Value { + if (std.mem.eql(u8, type_s, "bool")) { + const bool_v = std.mem.eql(u8, data_s, "1") or std.ascii.eqlIgnoreCase(data_s, "true"); + return Value{ .Bool = bool_v }; + } + if (std.mem.eql(u8, type_s, "int")) { + const int_v = try std.fmt.parseInt(i64, data_s, 10); + return Value{ .Int = int_v }; + } + if (std.mem.eql(u8, type_s, "float")) { + const float_v = try std.fmt.parseFloat(f64, data_s); + return Value{ .Float = float_v }; + } + if (std.mem.eql(u8, type_s, "bytes")) { + const bytes = try parseHexBytes(allocator, data_s); + return Value{ .Bytes = bytes }; + } + return error.InvalidValueType; + } + + fn parseHexBytes(allocator: Allocator, hex: []const u8) ![]u8 { + if (hex.len % 2 != 0) return error.InvalidHexLength; + + var out = try allocator.alloc(u8, hex.len / 2); + errdefer allocator.free(out); + + var i: usize = 0; + while (i < out.len) : (i += 1) { + out[i] = try std.fmt.parseInt(u8, hex[i * 2 .. i * 2 + 2], 16); + } + + return out; + } + + fn encodeHexLower(allocator: Allocator, bytes: []const u8) ![]u8 { + const table = "0123456789abcdef"; + var out = try allocator.alloc(u8, bytes.len * 2); + errdefer allocator.free(out); + + for (bytes, 0..) |b, i| { + out[i * 2] = table[b >> 4]; + out[i * 2 + 1] = table[b & 0x0F]; + } + + return out; + } + }; +} + +pub const DiskEntry = DiskEntryImpl; diff --git a/src/tsm/tsm.zig b/src/tsm/tsm.zig index f3e96d8..12aa277 100644 --- a/src/tsm/tsm.zig +++ b/src/tsm/tsm.zig @@ -1,32 +1,39 @@ const std = @import("std"); const testing = std.testing; -const cache = @import("cache.zig"); -const entry = @import("entry.zig"); const Allocator = std.mem.Allocator; -const Cache = cache.Cache; -const DiskEntry = entry.DiskEntry; -pub const TimestampEncoding = entry.TimestampEncoding; -pub fn TsmTreeImpl(comptime max_level: u64, comptime page_size: u32, comptime ts_encoding: TimestampEncoding) type { +const cache = @import("cache.zig"); +const entry = @import("entry.zig"); +const entry_csv = @import("entry_csv.zig"); +const types = @import("types.zig"); +pub const TimestampEncoding = types.TimestampEncoding; + +pub fn TsmTreeImpl( + comptime max_level: u64, + comptime page_size: u32, + comptime ts_encoding: TimestampEncoding, + comptime DiskEntry: fn (comptime u32, comptime TimestampEncoding) type, +) type { return struct { const Self = @This(); const MAX_CACHE_POINTS = 1_000_000; pub const timestamp_encoding = ts_encoding; + const CacheType = cache.CacheImplWithEntry(page_size, ts_encoding, DiskEntry); allocator: Allocator, name: []const u8, entries: []*DiskEntry(page_size, ts_encoding), entries_count: u64 = 0, - cache: *Cache(page_size, ts_encoding), + cache: *CacheType, pub const QueryOp = enum { AVG, MIN, MAX, SUM, COUNT }; - pub const DataPoint = Cache(page_size, ts_encoding).DataPoint; - pub const Value = Cache(page_size, ts_encoding).Value; + pub const DataPoint = CacheType.DataPoint; + pub const Value = CacheType.Value; pub fn init(allocator: Allocator, name: []const u8) !Self { - const cache_ptr = try allocator.create(Cache(page_size, ts_encoding)); - cache_ptr.* = Cache(page_size, ts_encoding).init(allocator); + const cache_ptr = try allocator.create(CacheType); + cache_ptr.* = CacheType.init(allocator); return Self{ .allocator = allocator, .name = try allocator.dupe(u8, name), @@ -70,6 +77,7 @@ pub fn TsmTreeImpl(comptime max_level: u64, comptime page_size: u32, comptime ts pub fn insertBulk(self: *Self, series_key: []const u8, data_points: []DataPoint) !void { for (data_points) |data_point| { try self.cache.insert(series_key, data_point); + if (self.cache.count > MAX_CACHE_POINTS) try self.flush(); } } @@ -233,7 +241,94 @@ pub fn TsmTreeImpl(comptime max_level: u64, comptime page_size: u32, comptime ts }; } -pub const TsmTree = TsmTreeImpl(100_000, 4096, .gorilla); +pub const TsmTreeNative = TsmTreeImpl(100_000, 4096, .gorilla, entry.DiskEntry); +pub const TsmTreeCsv = TsmTreeImpl(100_000, 4096, .gorilla, entry_csv.DiskEntry); +pub const TsmTree = TsmTreeNative; + +pub const TsmTreeRuntime = union(enum) { + const Self = @This(); + + Native: TsmTreeNative, + CSV: TsmTreeCsv, + + pub const QueryOp = TsmTreeNative.QueryOp; + pub const DataPoint = TsmTreeNative.DataPoint; + pub const Value = TsmTreeNative.Value; + pub const EntryBackend = enum { + Native, + CSV, + }; + + pub fn init(allocator: Allocator, name: []const u8, backend: EntryBackend) !Self { + return switch (backend) { + .Native => .{ .Native = try TsmTreeNative.init(allocator, name) }, + .CSV => .{ .CSV = try TsmTreeCsv.init(allocator, name) }, + }; + } + + pub fn deinit(self: *Self) void { + switch (self.*) { + .Native => |*tree| tree.deinit(), + .CSV => |*tree| tree.deinit(), + } + } + + pub fn flush(self: *Self) !void { + return switch (self.*) { + .Native => |*tree| tree.flush(), + .CSV => |*tree| tree.flush(), + }; + } + + pub fn insert(self: *Self, series_key: []const u8, data_point: DataPoint) !void { + return switch (self.*) { + .Native => |*tree| tree.insert(series_key, data_point), + .CSV => |*tree| tree.insert(series_key, data_point), + }; + } + + pub fn insertBulk(self: *Self, series_key: []const u8, data_points: []DataPoint) !void { + return switch (self.*) { + .Native => |*tree| tree.insertBulk(series_key, data_points), + .CSV => |*tree| tree.insertBulk(series_key, data_points), + }; + } + + pub fn insertBulkSeries(self: *Self, series_keys: []const []const u8, data_points: [][]DataPoint) !void { + return switch (self.*) { + .Native => |*tree| tree.insertBulkSeries(series_keys, data_points), + .CSV => |*tree| tree.insertBulkSeries(series_keys, data_points), + }; + } + + pub fn query(self: *Self, series_key: []const u8, timestamp_start: i64, timestamp_end: i64, op: QueryOp) !Value { + return switch (self.*) { + .Native => |*tree| tree.query(series_key, timestamp_start, timestamp_end, op), + .CSV => |*tree| tree.query(series_key, timestamp_start, timestamp_end, op), + }; + } + + pub fn queryRaw(self: *Self, series_key: []const u8, timestamp_start: i64, timestamp_end: i64) ![]Value { + return switch (self.*) { + .Native => |*tree| tree.queryRaw(series_key, timestamp_start, timestamp_end), + .CSV => |*tree| tree.queryRaw(series_key, timestamp_start, timestamp_end), + }; + } + + pub fn queryLatest(self: *Self, series_key: []const u8) !DataPoint { + return switch (self.*) { + .Native => |*tree| tree.queryLatest(series_key), + .CSV => |*tree| tree.queryLatest(series_key), + }; + } + + pub fn queryFirst(self: *Self, series_key: []const u8) !DataPoint { + return switch (self.*) { + .Native => |*tree| tree.queryFirst(series_key), + .CSV => |*tree| tree.queryFirst(series_key), + }; + } +}; test { _ = cache; diff --git a/src/tsm/types.zig b/src/tsm/types.zig new file mode 100644 index 0000000..e61bf7f --- /dev/null +++ b/src/tsm/types.zig @@ -0,0 +1,28 @@ +pub const TimestampEncoding = enum { + /// Zigzag varint delta encoding - faster queries + delta, + /// Delta-of-delta bit encoding - better compression + gorilla, +}; + +pub const Value = union(enum) { + Bool: bool, + Int: i64, + Float: f64, + Bytes: []const u8, + + pub fn compare(self: Value, b: Value) @import("std").math.Order { + const std = @import("std"); + return switch (self) { + .Int => |val| std.math.order(val, b.Int), + .Float => |val| std.math.order(val, b.Float), + .Bytes => |val| std.mem.order(u8, val, b.Bytes), + .Bool => |val| std.math.order(@intFromBool(val), @intFromBool(b.Bool)), + }; + } +}; + +pub const DataPoint = struct { + timestamp: i64, + value: Value, +};