diff --git a/polars/__pycache__/groupby-polars.cpython-38.pyc b/polars/__pycache__/groupby-polars.cpython-38.pyc new file mode 100644 index 00000000..b476d5d7 Binary files /dev/null and b/polars/__pycache__/groupby-polars.cpython-38.pyc differ diff --git a/polars/groupby-polars.py b/polars/groupby-polars.py index 3804a5bd..e54024ff 100755 --- a/polars/groupby-polars.py +++ b/polars/groupby-polars.py @@ -6,7 +6,7 @@ import gc import timeit import polars as pl -from polars.lazy import col +from polars import col exec(open("./_helpers/helpers.py").read()) @@ -23,18 +23,18 @@ print("loading dataset %s" % data_name, flush=True) with pl.StringCache(): - x = pl.read_csv(src_grp, dtype={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) - x["id1"] = x["id1"].cast(pl.Categorical) - x["id1"].shrink_to_fit(in_place=True) - x["id2"] = x["id2"].cast(pl.Categorical) - x["id2"].shrink_to_fit(in_place=True) - x["id3"] = x["id3"].cast(pl.Categorical) - x["id3"].shrink_to_fit(in_place=True) + x = (pl.read_csv(src_grp, dtypes={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True) + .with_columns(pl.col(["id1", "id2", "id3"]).cast(pl.Categorical))) in_rows = x.shape[0] +x.write_ipc("/tmp/tmp.ipc") +del x +x = pl.read_ipc("/tmp/tmp.ipc", memory_map=True) x = x.lazy() +# materialize print(len(x.collect()), flush=True) +in_rows = x.collect().shape[0] task_init = timeit.default_timer() print("grouping...", flush=True) @@ -42,7 +42,7 @@ question = "sum v1 by id1" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1")).collect() +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -53,7 +53,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id1").agg(pl.sum("v1")).collect() +ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -68,7 +68,7 @@ question = "sum v1 by id1:id2" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1")).collect() +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -79,7 +79,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby(["id1","id2"]).agg(pl.sum("v1")).collect() +ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -94,7 +94,7 @@ question = "sum v1 mean v3 by id3" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1"), pl.mean("v3")]).collect() +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -105,7 +105,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id3").agg([pl.sum("v1"), pl.mean("v3")]).collect() +ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -120,7 +120,7 @@ question = "mean v1:v3 by id4" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1"), pl.mean("v2"), pl.mean("v3")]).collect() +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -131,7 +131,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id4").agg([pl.mean("v1"), pl.mean("v2"), pl.mean("v3")]).collect() +ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -146,7 +146,7 @@ question = "sum v1:v3 by id6" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1"), pl.sum("v2"), pl.sum("v3")]).collect() +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -157,7 +157,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.groupby("id6").agg([pl.sum("v1"), pl.sum("v2"), pl.sum("v3")]).collect() +ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -224,7 +224,7 @@ question = "largest two v3 by id6" # q8 gc.collect() t_start = timeit.default_timer() -ans = x.drop_nulls("v3").sort("v3", reverse=True).groupby("id6").agg(col("v3").head(2).alias("largest2_v3")).explode("largest2_v3").collect() +ans = x.drop_nulls("v3").groupby("id6").agg(col("v3").top_k(2).alias("largest2_v3")).explode("largest2_v3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -235,7 +235,7 @@ del ans gc.collect() t_start = timeit.default_timer() -ans = x.drop_nulls("v3").sort("v3", reverse=True).groupby("id6").agg(col("v3").head(2).alias("largest2_v3")).explode("largest2_v3").collect() +ans = x.drop_nulls("v3").groupby("id6").agg(col("v3").top_k(2).alias("largest2_v3")).explode("largest2_v3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() @@ -299,6 +299,6 @@ print(ans.tail(3), flush=True) del ans -print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True) +print("grouping finished, took %0.3fs" % (timeit.default_timer() - task_init), flush=True) exit(0) diff --git a/polars/join-polars.py b/polars/join-polars.py index 75ed5f73..cdcd059e 100755 --- a/polars/join-polars.py +++ b/polars/join-polars.py @@ -27,50 +27,83 @@ print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True) with pl.StringCache(): - x = pl.read_csv(src_jn_x, dtype={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64}) - x["id4"] = x["id4"].cast(pl.Categorical) - x["id5"] = x["id5"].cast(pl.Categorical) - x["id6"] = x["id6"].cast(pl.Categorical) - small = pl.read_csv(src_jn_y[0], dtype={"id1":pl.Int32, "v2":pl.Float64}) - small["id4"] = small["id4"].cast(pl.Categorical) - medium = pl.read_csv(src_jn_y[1], dtype={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64}) - medium["id4"] = medium["id4"].cast(pl.Categorical) - medium["id5"] = medium["id5"].cast(pl.Categorical) - big = pl.read_csv(src_jn_y[2], dtype={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64}) - big["id4"] = big["id4"].cast(pl.Categorical) - big["id5"] = big["id5"].cast(pl.Categorical) - big["id6"] = big["id6"].cast(pl.Categorical) + x = (pl.read_csv(src_jn_x, dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64}) + .with_columns( + pl.col(["id4", "id5", "id6"]).cast(pl.Categorical) + ) + ) + small = pl.read_csv(src_jn_y[0], dtypes={"id1":pl.Int32, "v2":pl.Float64}) + small = small.with_columns( + pl.col("id4").cast(pl.Categorical) + ) + medium = (pl.read_csv(src_jn_y[1], dtypes={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64}) + .with_columns( + pl.col(["id4", "id5"]).cast(pl.Categorical), + )) + big = (pl.read_csv(src_jn_y[2], dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64}) + .with_columns( + pl.col(["id4", "id5", "id6"]).cast(pl.Categorical) + )) print(len(x), flush=True) print(len(small), flush=True) print(len(medium), flush=True) print(len(big), flush=True) +with pl.StringCache(): + x.write_ipc("/tmp/x.ipc") + del x + x = pl.read_ipc("/tmp/x.ipc", memory_map=True) + x = x.lazy() + + small.write_ipc("/tmp/small.ipc") + del small + small = pl.read_ipc("/tmp/small.ipc", memory_map=True) + small = small.lazy() + + medium.write_ipc("/tmp/medium.ipc") + del medium + medium = pl.read_ipc("/tmp/medium.ipc", memory_map=True) + medium = medium.lazy() + + big.write_ipc("/tmp/big.ipc") + del big + big = pl.read_ipc("/tmp/big.ipc", memory_map=True) + big = big.lazy() + +# materialize +print(len(x.collect()), flush=True) +print(len(small.collect()), flush=True) +print(len(medium.collect()), flush=True) +print(len(big.collect()), flush=True) + +in_rows = x.collect().shape[0] + task_init = timeit.default_timer() print("joining...", flush=True) question = "small inner on int" # q1 gc.collect() t_start = timeit.default_timer() -ans = x.join(small, on="id1") +ans = x.join(small, on="id1").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(small, on="id1") +ans = x.join(small, on="id1").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -78,25 +111,25 @@ question = "medium inner on int" # q2 gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id2") +ans = x.join(medium, on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id2") +ans = x.join(medium, on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -104,25 +137,25 @@ question = "medium outer on int" # q3 gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, how="left", on="id2") +ans = x.join(medium, how="left", on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, how="left", on="id2") +ans = x.join(medium, how="left", on="id2").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -130,25 +163,25 @@ question = "medium inner on factor" # q4 gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id5") +ans = x.join(medium, on="id5").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(medium, on="id5") +ans = x.join(medium, on="id5").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans @@ -156,25 +189,25 @@ question = "big inner on int" # q5 gc.collect() t_start = timeit.default_timer() -ans = x.join(big, on="id3") +ans = x.join(big, on="id3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) del ans gc.collect() t_start = timeit.default_timer() -ans = x.join(big, on="id3") +ans = x.join(big, on="id3").collect() print(ans.shape, flush=True) t = timeit.default_timer() - t_start m = memory_usage() t_start = timeit.default_timer() chk = [ans["v1"].sum(), ans["v2"].sum()] chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) print(ans.head(3), flush=True) print(ans.tail(3), flush=True) del ans