@@ -367,6 +367,17 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) {
367367 if (key_col && (key_col -> type == RAY_LIST || key_col -> type == RAY_STR || key_col -> type == RAY_GUID ))
368368 use_eval_group = 1 ;
369369 }
370+ /* Force eval-level grouping when any output column is a
371+ * non-aggregation expression (lambda call, arithmetic, etc.)
372+ * that the DAG group path would silently drop. */
373+ if (!use_eval_group && n_out > 0 ) {
374+ for (int64_t i = 0 ; i + 1 < dict_n ; i += 2 ) {
375+ int64_t kid = dict_elems [i ]-> i64 ;
376+ if (kid == from_id || kid == where_id || kid == by_id ||
377+ kid == take_id || kid == asc_id || kid == desc_id ) continue ;
378+ if (!is_agg_expr (dict_elems [i + 1 ])) { use_eval_group = 1 ; break ; }
379+ }
380+ }
370381 if (use_eval_group ) {
371382 /* Apply WHERE filter first (if any), then eval-level groupby */
372383 ray_t * eval_tbl = tbl ;
@@ -425,52 +436,100 @@ ray_t* ray_select_fn(ray_t** args, int64_t n) {
425436 int64_t kid = dict_elems [i ]-> i64 ;
426437 if (kid == from_id || kid == where_id || kid == by_id || kid == take_id || kid == asc_id || kid == desc_id ) continue ;
427438 ray_t * val_expr_item = dict_elems [i + 1 ];
428- if (!is_agg_expr (val_expr_item )) continue ;
429439
430- ray_t * * agg_elems = (ray_t * * )ray_data (val_expr_item );
431- ray_t * agg_fn_name = agg_elems [0 ];
432- ray_t * agg_col_expr = agg_elems [1 ];
440+ if (is_agg_expr (val_expr_item )) {
441+ ray_t * * agg_elems = (ray_t * * )ray_data (val_expr_item );
442+ ray_t * agg_fn_name = agg_elems [0 ];
443+ ray_t * agg_col_expr = agg_elems [1 ];
433444
434- /* Resolve source column from filtered table */
435- ray_t * src_col_val = NULL ;
436- if (agg_col_expr -> type == - RAY_SYM && (agg_col_expr -> attrs & RAY_ATTR_NAME )) {
437- src_col_val = ray_table_get_col (eval_tbl , agg_col_expr -> i64 );
438- if (src_col_val ) ray_retain (src_col_val );
439- }
440- if (!src_col_val ) {
441- src_col_val = ray_eval (agg_col_expr );
442- if (RAY_IS_ERR (src_col_val )) { ray_release (groups ); if (eval_tbl != tbl ) ray_release (eval_tbl ); ray_release (tbl ); return src_col_val ; }
443- }
445+ /* Resolve source column from filtered table */
446+ ray_t * src_col_val = NULL ;
447+ if (agg_col_expr -> type == - RAY_SYM && (agg_col_expr -> attrs & RAY_ATTR_NAME )) {
448+ src_col_val = ray_table_get_col (eval_tbl , agg_col_expr -> i64 );
449+ if (src_col_val ) ray_retain (src_col_val );
450+ }
451+ if (!src_col_val ) {
452+ src_col_val = ray_eval (agg_col_expr );
453+ if (RAY_IS_ERR (src_col_val )) { ray_release (groups ); if (eval_tbl != tbl ) ray_release (eval_tbl ); ray_release (tbl ); return src_col_val ; }
454+ }
444455
445- /* For each group, compute aggregation */
446- ray_t * agg_vec = NULL ;
447- ray_t * * grp_items = (ray_t * * )ray_data (groups );
448- for (int64_t gi = 0 ; gi < n_groups ; gi ++ ) {
449- ray_t * idx_list = grp_items [gi * 2 + 1 ];
450- ray_t * subset = ray_at_fn (src_col_val , idx_list );
451- if (RAY_IS_ERR (subset )) continue ;
452- ray_t * agg_val = NULL ;
453- ray_t * fn_obj = ray_env_get (agg_fn_name -> i64 );
454- if (fn_obj && fn_obj -> type == RAY_UNARY ) {
455- ray_unary_fn uf = (ray_unary_fn )(uintptr_t )fn_obj -> i64 ;
456- agg_val = uf (subset );
456+ /* For each group, compute aggregation */
457+ ray_t * agg_vec = NULL ;
458+ ray_t * * grp_items = (ray_t * * )ray_data (groups );
459+ for (int64_t gi = 0 ; gi < n_groups ; gi ++ ) {
460+ ray_t * idx_list = grp_items [gi * 2 + 1 ];
461+ ray_t * subset = ray_at_fn (src_col_val , idx_list );
462+ if (RAY_IS_ERR (subset )) continue ;
463+ ray_t * agg_val = NULL ;
464+ ray_t * fn_obj = ray_env_get (agg_fn_name -> i64 );
465+ if (fn_obj && fn_obj -> type == RAY_UNARY ) {
466+ ray_unary_fn uf = (ray_unary_fn )(uintptr_t )fn_obj -> i64 ;
467+ agg_val = uf (subset );
468+ }
469+ ray_release (subset );
470+ if (!agg_val || RAY_IS_ERR (agg_val )) continue ;
471+
472+ if (!agg_vec ) {
473+ int8_t vt = - (agg_val -> type );
474+ agg_vec = ray_vec_new (vt , n_groups );
475+ if (RAY_IS_ERR (agg_vec )) { ray_release (agg_val ); break ; }
476+ agg_vec -> len = n_groups ;
477+ }
478+ store_typed_elem (agg_vec , gi , agg_val );
479+ ray_release (agg_val );
457480 }
458- ray_release (subset );
459- if (!agg_val || RAY_IS_ERR (agg_val )) continue ;
460-
461- if (!agg_vec ) {
462- int8_t vt = - (agg_val -> type );
463- agg_vec = ray_vec_new (vt , n_groups );
464- if (RAY_IS_ERR (agg_vec )) { ray_release (agg_val ); break ; }
465- agg_vec -> len = n_groups ;
481+ ray_release (src_col_val );
482+ agg_names [n_agg_out ] = kid ;
483+ agg_results [n_agg_out ] = agg_vec ;
484+ n_agg_out ++ ;
485+ } else {
486+ /* Non-aggregation expression (lambda, arithmetic, etc.):
487+ * bind table columns into scope, evaluate the expression
488+ * on the full table, then gather the last value per group. */
489+ ray_env_push_scope ();
490+ int64_t enc = ray_table_ncols (eval_tbl );
491+ for (int64_t ci = 0 ; ci < enc ; ci ++ ) {
492+ int64_t cn = ray_table_col_name (eval_tbl , ci );
493+ ray_t * cv = ray_table_get_col_idx (eval_tbl , ci );
494+ if (cv ) ray_env_set (cn , cv );
495+ }
496+ ray_t * full_val = ray_eval (val_expr_item );
497+ ray_env_pop_scope ();
498+ if (RAY_IS_ERR (full_val )) { ray_release (groups ); if (eval_tbl != tbl ) ray_release (eval_tbl ); ray_release (tbl ); return full_val ; }
499+
500+ ray_t * agg_vec = NULL ;
501+ ray_t * * grp_items = (ray_t * * )ray_data (groups );
502+ for (int64_t gi = 0 ; gi < n_groups ; gi ++ ) {
503+ ray_t * idx_list = grp_items [gi * 2 + 1 ];
504+ /* Take last index in each group */
505+ ray_t * last_val = NULL ;
506+ if (ray_is_vec (full_val )) {
507+ ray_t * subset = ray_at_fn (full_val , idx_list );
508+ if (subset && !RAY_IS_ERR (subset )) {
509+ last_val = ray_last_fn (subset );
510+ ray_release (subset );
511+ }
512+ } else {
513+ /* Scalar expression — replicate for each group */
514+ last_val = full_val ;
515+ ray_retain (last_val );
516+ }
517+ if (!last_val || RAY_IS_ERR (last_val )) continue ;
518+
519+ if (!agg_vec ) {
520+ int8_t vt = ray_is_atom (last_val ) ? - (last_val -> type ) : last_val -> type ;
521+ agg_vec = ray_vec_new (vt , n_groups );
522+ if (RAY_IS_ERR (agg_vec )) { ray_release (last_val ); break ; }
523+ agg_vec -> len = n_groups ;
524+ }
525+ store_typed_elem (agg_vec , gi , last_val );
526+ ray_release (last_val );
466527 }
467- store_typed_elem (agg_vec , gi , agg_val );
468- ray_release (agg_val );
528+ ray_release (full_val );
529+ agg_names [n_agg_out ] = kid ;
530+ agg_results [n_agg_out ] = agg_vec ;
531+ n_agg_out ++ ;
469532 }
470- ray_release (src_col_val );
471- agg_names [n_agg_out ] = kid ;
472- agg_results [n_agg_out ] = agg_vec ;
473- n_agg_out ++ ;
474533 }
475534
476535 /* Build result table: key column + aggregation columns */
0 commit comments