Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions include/rayforce.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ typedef enum {
typedef union ray_t {
/* Allocated: object header */
struct {
/* Bytes 0-15: slice / sym_dict / str_pool / index / link arm.
/* Bytes 0-15: slice / str_pool / index / link arm.
* Null state is sentinel-encoded in the payload (see
* src/vec/vec.c); this 16-byte slot carries no bitmap bits.
* The `nullmap` name is retained as the raw-byte view used by
Expand All @@ -123,7 +123,6 @@ typedef union ray_t {
union {
uint8_t nullmap[16];
struct { union ray_t* slice_parent; int64_t slice_offset; };
struct { uint8_t _aux_sym_lo[8]; union ray_t* sym_dict; };
struct { uint8_t _aux_str_lo[8]; union ray_t* str_pool; };
/* RAY_ATTR_HAS_INDEX (vectors): ray_t* of type RAY_INDEX
* carrying the accelerator payload and the saved nullmap
Expand Down
7 changes: 0 additions & 7 deletions src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -1151,13 +1151,6 @@ ray_t* gather_by_idx(ray_t* vec, int64_t* idx, int64_t n) {
if (ray_vec_is_null(vec, idx[i]))
ray_vec_set_null(result, i, true);
}
const ray_t* dict_owner = (vec->attrs & RAY_ATTR_SLICE) ? vec->slice_parent : vec;
if (dict_owner &&
!(dict_owner->attrs & RAY_ATTR_SLICE) &&
dict_owner->sym_dict) {
ray_retain(dict_owner->sym_dict);
result->sym_dict = dict_owner->sym_dict;
}
return result;
}

Expand Down
2 changes: 1 addition & 1 deletion src/mem/heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ static void ray_release_owned_refs(ray_t* v) {
}

/* Vector with attached index: nullmap[0..7] holds an owning ref to
* the index ray_t. The index owns the displaced str_pool / sym_dict,
* the index ray_t. The index owns the displaced str_pool,
* so we must NOT also try to release those off the parent — they
* aren't there anymore. Skip the STR_pool branch. */
if (v->attrs & RAY_ATTR_HAS_INDEX) {
Expand Down
6 changes: 3 additions & 3 deletions src/mem/heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@
#define RAY_ATTR_HNSW 0x04

/* Vector is a linked column. The 8 bytes of the nullmap union at offset
* 8 (i.e. parent->_idx_pad / parent->slice_offset / parent->sym_dict /
* parent->str_pool slot, depending on which arm is in use) hold an int64
* 8 (i.e. parent->_idx_pad / parent->slice_offset / parent->str_pool
* slot, depending on which arm is in use) hold an int64
* sym ID naming the target table. Resolved against the global env at
* deref time. Restricted to RAY_I32 / RAY_I64 vectors — STR/SYM/SLICE
* already use bytes 8-15 for their own pointers/data so HAS_LINK on
Expand All @@ -101,7 +101,7 @@

/* Vector carries an attached accelerator index in nullmap[0..7] (a ray_t*
* of type RAY_INDEX). The original 16-byte nullmap union content
* (slice_offset, str_pool, sym_dict, link_target) is preserved inside the
* (slice_offset, str_pool, link_target) is preserved inside the
* index ray_t and restored on detach.
*
* HAS_NULLS is preserved on the parent across attach/detach; many call
Expand Down
13 changes: 0 additions & 13 deletions src/ops/collection.c
Original file line number Diff line number Diff line change
Expand Up @@ -713,17 +713,6 @@ int atom_eq(ray_t* a, ray_t* b) {
/* Forward declaration */
ray_t* list_to_typed_vec(ray_t* list, int8_t orig_vec_type);

static void propagate_sym_dict(ray_t* dst, const ray_t* src) {
if (!dst || !src || dst->type != RAY_SYM || src->type != RAY_SYM) return;
const ray_t* owner = (src->attrs & RAY_ATTR_SLICE) ? src->slice_parent : src;
if (owner &&
!(owner->attrs & RAY_ATTR_SLICE) &&
owner->sym_dict) {
ray_retain(owner->sym_dict);
dst->sym_dict = owner->sym_dict;
}
}

/* Eager vector dedup — called by the DAG executor's OP_DISTINCT case.
* Factored out so the executor doesn't go through ray_distinct_fn, which
* is now a lazy producer for vectors and would re-wrap into a chain. */
Expand Down Expand Up @@ -1348,7 +1337,6 @@ ray_t* ray_take_fn(ray_t* vec, ray_t* n_obj) {
* source's str_pool by pool_off — propagate the pool ray_t
* (with retain) so the result owns a valid backing store. */
if (vtype == RAY_STR) col_propagate_str_pool(result, vec);
if (vtype == RAY_SYM) propagate_sym_dict(result, vec);
/* Propagate null bitmap — check parent's flag for slices */
bool has_nulls = (vec->attrs & RAY_ATTR_HAS_NULLS) ||
((vec->attrs & RAY_ATTR_SLICE) && vec->slice_parent &&
Expand Down Expand Up @@ -1544,7 +1532,6 @@ ray_t* ray_take_fn(ray_t* vec, ray_t* n_obj) {
* past the SSO threshold, tripping the assertion in
* ray_str_t_ptr / strsort_repack_window / strkey_cmp. */
if (vtype == RAY_STR) col_propagate_str_pool(result, vec);
if (vtype == RAY_SYM) propagate_sym_dict(result, vec);
/* Propagate null bitmap — check parent's flag for slices */
bool has_nulls = len > 0 &&
((vec->attrs & RAY_ATTR_HAS_NULLS) ||
Expand Down
54 changes: 41 additions & 13 deletions src/ops/expr.c
Original file line number Diff line number Diff line change
Expand Up @@ -853,13 +853,21 @@ static void expr_exec_unary(uint8_t opcode, int8_t dt, void* dp,
} else if (dt == RAY_BOOL) {
uint8_t* d = (uint8_t*)dp;
if (opcode == OP_CAST) {
/* (as 'BOOL ...) — truthy semantics, not truncation. */
/* (as 'BOOL ...) — truthy semantics, but treat null sentinel
* as false (BOOL is non-nullable, so we can't preserve null
* structurally; a SQL-style "missing → not true" mapping is
* the least-surprising convention). For F64, NULL_F64 = NaN:
* the IEEE `NaN != 0.0` is true, so add an explicit NaN check
* (`a[j] == a[j]` is false iff NaN). For I64, NULL_I64 =
* INT64_MIN is a regular non-zero value, so skip it. */
if (t1 == RAY_F64) {
const double* a = (const double*)ap;
for (int64_t j = 0; j < n; j++) d[j] = (a[j] != 0.0) ? 1 : 0;
for (int64_t j = 0; j < n; j++)
d[j] = (a[j] != 0.0 && a[j] == a[j]) ? 1 : 0;
} else {
const int64_t* a = (const int64_t*)ap;
for (int64_t j = 0; j < n; j++) d[j] = a[j] ? 1 : 0;
for (int64_t j = 0; j < n; j++)
d[j] = (a[j] != 0 && a[j] != NULL_I64) ? 1 : 0;
}
} else {
const uint8_t* a = (const uint8_t*)ap;
Expand Down Expand Up @@ -1358,12 +1366,27 @@ ray_t* exec_elementwise_unary(ray_graph_t* g, ray_op_t* op, ray_t* input) {
out_off += n;
}
} else if (in_type == RAY_I64 && out_type == RAY_BOOL) {
/* ISNULL over a non-null vec: always false */
while (ray_morsel_next(&m)) {
int64_t n = m.morsel_len;
uint8_t* dst = (uint8_t*)((char*)ray_data(result) + out_off);
for (int64_t i = 0; i < n; i++) dst[i] = 0;
out_off += n;
if (opc == OP_ISNULL) {
/* ISNULL over a non-null vec: always false here; the
* null-propagation pass at the end of the function sets
* dst[i]=1 for null rows of the input. */
while (ray_morsel_next(&m)) {
int64_t n = m.morsel_len;
uint8_t* dst = (uint8_t*)((char*)ray_data(result) + out_off);
for (int64_t i = 0; i < n; i++) dst[i] = 0;
out_off += n;
}
} else if (opc == OP_CAST) {
/* (as 'BOOL i64_col) — truthy semantics; NULL_I64 = INT64_MIN
* sentinel is non-zero but logically missing, so skip it. */
while (ray_morsel_next(&m)) {
int64_t n = m.morsel_len;
int64_t* src = (int64_t*)m.morsel_ptr;
uint8_t* dst = (uint8_t*)((char*)ray_data(result) + out_off);
for (int64_t i = 0; i < n; i++)
dst[i] = (src[i] != 0 && src[i] != NULL_I64) ? 1 : 0;
out_off += n;
}
}
} else if (in_type == RAY_BOOL && opc == OP_NOT) {
while (ray_morsel_next(&m)) {
Expand Down Expand Up @@ -1485,7 +1508,11 @@ ray_t* exec_elementwise_unary(ray_graph_t* g, ray_op_t* op, ray_t* input) {
double* src = (double*)m.morsel_ptr;
uint8_t* dst = (uint8_t*)((char*)ray_data(result) + out_off);
if (out_type == RAY_BOOL)
for (int64_t i = 0; i < n; i++) dst[i] = (src[i] != 0.0) ? 1 : 0;
/* NaN (NULL_F64 sentinel) is "missing"; IEEE
* `NaN != 0.0` is true so add an explicit
* `src[i] == src[i]` to filter NaN to false. */
for (int64_t i = 0; i < n; i++)
dst[i] = (src[i] != 0.0 && src[i] == src[i]) ? 1 : 0;
else
for (int64_t i = 0; i < n; i++) dst[i] = (uint8_t)src[i];
out_off += n;
Expand Down Expand Up @@ -1818,7 +1845,8 @@ static void binary_range(ray_op_t* op, int8_t out_type,
case OP_ADD: for(int64_t i=0;i<n;i++){int32_t li=(int32_t)LV_READ(i),ri=(int32_t)RV_READ(i);odst[i]=(int32_t)((uint32_t)li+(uint32_t)ri);}break;
case OP_SUB: for(int64_t i=0;i<n;i++){int32_t li=(int32_t)LV_READ(i),ri=(int32_t)RV_READ(i);odst[i]=(int32_t)((uint32_t)li-(uint32_t)ri);}break;
case OP_MUL: for(int64_t i=0;i<n;i++){int32_t li=(int32_t)LV_READ(i),ri=(int32_t)RV_READ(i);odst[i]=(int32_t)((uint32_t)li*(uint32_t)ri);}break;
case OP_DIV: for(int64_t i=0;i<n;i++){int32_t li=(int32_t)LV_READ(i),ri=(int32_t)RV_READ(i);int32_t r;if(ri==0||(ri==-1&&li==((int32_t)1<<31))){r=0;}else{r=li/ri;if((li^ri)<0&&r*ri!=li)r--;}odst[i]=r;}break;
/* OP_DIV omitted — ray_binop hard-codes F64 for OP_DIV, so
* narrow-output OP_DIV is unreachable through any caller. */
case OP_IDIV:for(int64_t i=0;i<n;i++){double lv=LV_READ(i),rv=RV_READ(i);odst[i]=rv!=0.0?(int32_t)floor(lv/rv):0;}break;
case OP_MOD: for(int64_t i=0;i<n;i++){int32_t li=(int32_t)LV_READ(i),ri=(int32_t)RV_READ(i);int32_t r;if(ri==0||(ri==-1&&li==((int32_t)1<<31))){r=0;}else{r=li%ri;if(r&&(r^ri)<0)r+=ri;}odst[i]=r;}break;
case OP_MIN2:for(int64_t i=0;i<n;i++){int32_t li=(int32_t)LV_READ(i),ri=(int32_t)RV_READ(i);odst[i]=li<ri?li:ri;}break;
Expand All @@ -1831,7 +1859,7 @@ static void binary_range(ray_op_t* op, int8_t out_type,
case OP_ADD: for(int64_t i=0;i<n;i++){int16_t li=(int16_t)LV_READ(i),ri=(int16_t)RV_READ(i);odst[i]=(int16_t)((uint16_t)li+(uint16_t)ri);}break;
case OP_SUB: for(int64_t i=0;i<n;i++){int16_t li=(int16_t)LV_READ(i),ri=(int16_t)RV_READ(i);odst[i]=(int16_t)((uint16_t)li-(uint16_t)ri);}break;
case OP_MUL: for(int64_t i=0;i<n;i++){int16_t li=(int16_t)LV_READ(i),ri=(int16_t)RV_READ(i);odst[i]=(int16_t)((uint16_t)li*(uint16_t)ri);}break;
case OP_DIV: for(int64_t i=0;i<n;i++){int16_t li=(int16_t)LV_READ(i),ri=(int16_t)RV_READ(i);odst[i]=ri?li/ri:0;}break;
/* OP_DIV omitted — unreachable, see I32 arm. */
case OP_IDIV:for(int64_t i=0;i<n;i++){double lv=LV_READ(i),rv=RV_READ(i);odst[i]=rv!=0.0?(int16_t)floor(lv/rv):0;}break;
case OP_MOD: for(int64_t i=0;i<n;i++){int16_t li=(int16_t)LV_READ(i),ri=(int16_t)RV_READ(i);odst[i]=ri?li%ri:0;}break;
case OP_MIN2:for(int64_t i=0;i<n;i++){int16_t li=(int16_t)LV_READ(i),ri=(int16_t)RV_READ(i);odst[i]=li<ri?li:ri;}break;
Expand All @@ -1844,7 +1872,7 @@ static void binary_range(ray_op_t* op, int8_t out_type,
case OP_ADD: for(int64_t i=0;i<n;i++){uint8_t li=(uint8_t)LV_READ(i),ri=(uint8_t)RV_READ(i);odst[i]=li+ri;}break;
case OP_SUB: for(int64_t i=0;i<n;i++){uint8_t li=(uint8_t)LV_READ(i),ri=(uint8_t)RV_READ(i);odst[i]=li-ri;}break;
case OP_MUL: for(int64_t i=0;i<n;i++){uint8_t li=(uint8_t)LV_READ(i),ri=(uint8_t)RV_READ(i);odst[i]=li*ri;}break;
case OP_DIV: for(int64_t i=0;i<n;i++){uint8_t li=(uint8_t)LV_READ(i),ri=(uint8_t)RV_READ(i);odst[i]=ri?li/ri:0;}break;
/* OP_DIV omitted — unreachable, see I32 arm. */
case OP_IDIV:for(int64_t i=0;i<n;i++){double lv=LV_READ(i),rv=RV_READ(i);odst[i]=rv!=0.0?(uint8_t)floor(lv/rv):0;}break;
case OP_MOD: for(int64_t i=0;i<n;i++){uint8_t li=(uint8_t)LV_READ(i),ri=(uint8_t)RV_READ(i);odst[i]=ri?li%ri:0;}break;
case OP_MIN2:for(int64_t i=0;i<n;i++){uint8_t li=(uint8_t)LV_READ(i),ri=(uint8_t)RV_READ(i);odst[i]=li<ri?li:ri;}break;
Expand Down
31 changes: 30 additions & 1 deletion src/ops/fused_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,37 @@ static inline uint8_t fp_eval_cmp_one(const fp_cmp_t* p, int64_t row) {
return (uint8_t)(p->fold == FP_FOLD_TRUE);
if (p->col_type == RAY_SYM && !p->cval_in_dict)
return (uint8_t)(p->op == FP_NE);
if (p->op == FP_LIKE)
if (p->op == FP_LIKE) {
if (p->col_type == RAY_SYM) {
uint64_t sid = (uint64_t)read_by_esz(p->col_base, row, p->col_esz);
if (sid >= p->like_lut_count || !p->like_lut || !p->like_sym_strings)
return 0;
uint8_t state = p->like_lut[sid];
if (!state) {
ray_t* s = p->like_sym_strings[sid];
uint8_t match = 0;
if (s) {
const char* sp = ray_str_ptr(s);
size_t sl = ray_str_len(s);
match = (p->pat_compiled.shape != RAY_GLOB_SHAPE_NONE)
? (uint8_t)ray_glob_match_compiled(&p->pat_compiled, sp, sl)
: (uint8_t)ray_glob_match(sp, sl, p->pat_str, p->pat_len);
}
state = (uint8_t)(match ? 2 : 1);
p->like_lut[sid] = state;
}
return (uint8_t)(state == 2);
}
if (p->col_type == RAY_STR) {
size_t sl = 0;
const char* sp = ray_str_vec_get(p->col_obj, row, &sl);
if (!sp) sp = "";
return (p->pat_compiled.shape != RAY_GLOB_SHAPE_NONE)
? (uint8_t)ray_glob_match_compiled(&p->pat_compiled, sp, sl)
: (uint8_t)ray_glob_match(sp, sl, p->pat_str, p->pat_len);
}
return 0;
}

int64_t v = fp_cmp_read_i64_at(p, row);
if (p->op == FP_IN) {
Expand Down
10 changes: 0 additions & 10 deletions src/ops/fused_topk.c
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,6 @@ ray_t* ray_fused_topk_select(ray_t* tbl,
&& kt != RAY_I16 && kt != RAY_I32 && kt != RAY_I64
&& kt != RAY_DATE && kt != RAY_TIME && kt != RAY_TIMESTAMP)
return NULL;
/* The SYM comparator (fpk_cmp) resolves dict IDs through the
* GLOBAL sym_strings snapshot (ctx.sym_strings). A column with
* its own per-vector sym_dict stores LOCAL indices that don't
* map to the global table, so comparisons would order against
* the wrong strings. Reject and fall back. */
if (kt == RAY_SYM) {
const ray_t* dict_owner = (col->attrs & RAY_ATTR_SLICE)
? col->slice_parent : col;
if (dict_owner && dict_owner->sym_dict) return NULL;
}
ctx.keys[i].type = kt;
ctx.keys[i].attrs = col->attrs;
ctx.keys[i].esz = ray_sym_elem_size(kt, col->attrs);
Expand Down
2 changes: 1 addition & 1 deletion src/ops/idxop.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static inline ray_index_t* ray_index_payload(ray_t* idx) {

/* Build an accelerator and attach. Numeric types only for v1
* (BOOL/U8/I16/I32/I64/F32/F64/DATE/TIME/TIMESTAMP — RAY_STR/RAY_SYM/RAY_GUID
* deferred until the str_pool/sym_dict displacement sweep is complete).
* deferred until the str_pool displacement sweep is complete).
* On success, *vp is the (possibly new) parent vector with HAS_INDEX set.
* On failure, *vp is unchanged and a RAY_ERROR is returned. */
ray_t* ray_index_attach_zone (ray_t** vp);
Expand Down
14 changes: 2 additions & 12 deletions src/ops/linkop.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ ray_t* ray_link_deref(ray_t* v, int64_t sym_id) {
int64_t target_n = target_col->len;
int8_t out_type = target_col->type;

/* Resolve through slices: SYM-width and (later) sym_dict / str_pool
/* Resolve through slices: SYM-width and (later) str_pool
* all live on the slice_parent's attrs/union, never on the slice
* itself. The slice contributes only its [slice_offset, len) view.
* Compute the canonical width and base-pointer once here so the
Expand Down Expand Up @@ -265,19 +265,9 @@ ray_t* ray_link_deref(ray_t* v, int64_t sym_id) {
}

/* Type-specific metadata propagation.
* RAY_STR: share the source pool so ray_str_t pool_offs are valid.
* RAY_SYM: if the source column carries a local sym_dict, share it.
* sym_dict aliases bytes 8-15 of the nullmap union and is safe
* to read on any non-slice SYM vec — sentinel-encoded nulls
* don't consume those bytes. */
* RAY_STR: share the source pool so ray_str_t pool_offs are valid. */
if (out_type == RAY_STR) {
col_propagate_str_pool(result, target_col);
} else if (out_type == RAY_SYM) {
if (col_owner && !(col_owner->attrs & RAY_ATTR_SLICE) &&
col_owner->sym_dict) {
ray_retain(col_owner->sym_dict);
result->sym_dict = col_owner->sym_dict;
}
}
return result;
}
Expand Down
47 changes: 35 additions & 12 deletions src/ops/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -6107,6 +6107,25 @@ ray_t* ray_select(ray_t** args, int64_t n) {
}
}

/* Pre-compute the top-count-take emit filter so the no-WHERE
* count-key DAG decision (around line 7541) can see it. The
* actual thread-local set is still deferred to immediately
* before ray_execute (see below) so state-leakage on error
* paths in between is bounded. Without this hoist the decision
* read at compile time always sees an unset filter and the
* fp_try_i32_mg_top_count fast path is unreachable for
* `select count by k take N desc` shapes. */
ray_group_emit_filter_t pre_top_emit = {0};
bool pre_top_emit_matched = false;
if (by_expr) {
ray_group_emit_filter_t cur_emit = ray_group_emit_filter_get();
if (!cur_emit.enabled &&
match_group_desc_count_take(dict_elems, dict_n, from_id, where_id,
by_id, take_id, asc_id, desc_id,
&pre_top_emit))
pre_top_emit_matched = true;
}

/* GROUP BY */
if (by_expr) {
/* Resolve a "single key" sym id when by_expr is either a
Expand Down Expand Up @@ -7537,7 +7556,14 @@ ray_t* ray_select(ray_t** args, int64_t n) {
agg_kinds_ok = 0;
}
int no_where_count_key_ok = 0;
ray_group_emit_filter_t no_where_emit = ray_group_emit_filter_get();
/* Use the pre-computed filter when available so this
* read agrees with what will actually be installed
* just before ray_execute. Falling back to a live
* get() preserves behaviour for any caller that
* pre-set the filter outside ray_select. */
ray_group_emit_filter_t no_where_emit = pre_top_emit_matched
? pre_top_emit
: ray_group_emit_filter_get();
if (!where_expr && n_keys == 1 && no_where_emit.enabled &&
no_where_emit.agg_index == 0 &&
no_where_emit.top_count_take > 0) {
Expand Down Expand Up @@ -8645,19 +8671,16 @@ ray_t* ray_select(ray_t** args, int64_t n) {
}
}

/* Install the pre-computed top-count emit filter just before
* ray_execute reads it (and the DAG built above which already
* consumed pre_top_emit via the no_where_count_key_ok check).
* No re-running of match_group_desc_count_take needed. */
ray_group_emit_filter_t prev_self_emit = {0};
bool self_emit_set = false;
if (by_expr) {
ray_group_emit_filter_t cur_emit = ray_group_emit_filter_get();
ray_group_emit_filter_t top_emit = {0};
if (!cur_emit.enabled &&
match_group_desc_count_take(dict_elems, dict_n, from_id, where_id,
by_id, take_id, asc_id, desc_id,
&top_emit)) {
prev_self_emit = cur_emit;
ray_group_emit_filter_set(top_emit);
self_emit_set = true;
}
if (pre_top_emit_matched) {
prev_self_emit = ray_group_emit_filter_get();
ray_group_emit_filter_set(pre_top_emit);
self_emit_set = true;
}

/* Optimize and execute */
Expand Down
Loading
Loading