Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix polars groupby script to polars==latest #244

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added polars/__pycache__/groupby-polars.cpython-38.pyc
Binary file not shown.
42 changes: 21 additions & 21 deletions polars/groupby-polars.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
import gc
import timeit
import polars as pl
from polars.lazy import col
from polars import col

exec(open("./_helpers/helpers.py").read())

@@ -23,26 +23,26 @@
print("loading dataset %s" % data_name, flush=True)

with pl.StringCache():
x = pl.read_csv(src_grp, dtype={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True)
x["id1"] = x["id1"].cast(pl.Categorical)
x["id1"].shrink_to_fit(in_place=True)
x["id2"] = x["id2"].cast(pl.Categorical)
x["id2"].shrink_to_fit(in_place=True)
x["id3"] = x["id3"].cast(pl.Categorical)
x["id3"].shrink_to_fit(in_place=True)
x = (pl.read_csv(src_grp, dtypes={"id4":pl.Int32, "id5":pl.Int32, "id6":pl.Int32, "v1":pl.Int32, "v2":pl.Int32, "v3":pl.Float64}, low_memory=True)
.with_columns(pl.col(["id1", "id2", "id3"]).cast(pl.Categorical)))

in_rows = x.shape[0]
x.write_ipc("/tmp/tmp.ipc")
del x
x = pl.read_ipc("/tmp/tmp.ipc", memory_map=True)
x = x.lazy()

# materialize
print(len(x.collect()), flush=True)
in_rows = x.collect().shape[0]

task_init = timeit.default_timer()
print("grouping...", flush=True)

question = "sum v1 by id1" # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id1").agg(pl.sum("v1")).collect()
ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -53,7 +53,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id1").agg(pl.sum("v1")).collect()
ans = x.groupby("id1").agg(pl.sum("v1").alias("v1_sum")).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -68,7 +68,7 @@
question = "sum v1 by id1:id2" # q2
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(["id1","id2"]).agg(pl.sum("v1")).collect()
ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -79,7 +79,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(["id1","id2"]).agg(pl.sum("v1")).collect()
ans = x.groupby(["id1","id2"]).agg(pl.sum("v1").alias("v1_sum")).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -94,7 +94,7 @@
question = "sum v1 mean v3 by id3" # q3
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id3").agg([pl.sum("v1"), pl.mean("v3")]).collect()
ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -105,7 +105,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id3").agg([pl.sum("v1"), pl.mean("v3")]).collect()
ans = x.groupby("id3").agg([pl.sum("v1").alias("v1_sum"), pl.mean("v3").alias("v3_mean")]).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -120,7 +120,7 @@
question = "mean v1:v3 by id4" # q4
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id4").agg([pl.mean("v1"), pl.mean("v2"), pl.mean("v3")]).collect()
ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -131,7 +131,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id4").agg([pl.mean("v1"), pl.mean("v2"), pl.mean("v3")]).collect()
ans = x.groupby("id4").agg([pl.mean("v1").alias("v1_mean"), pl.mean("v2").alias("v2_mean"), pl.mean("v3").alias("v3_mean")]).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -146,7 +146,7 @@
question = "sum v1:v3 by id6" # q5
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id6").agg([pl.sum("v1"), pl.sum("v2"), pl.sum("v3")]).collect()
ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -157,7 +157,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby("id6").agg([pl.sum("v1"), pl.sum("v2"), pl.sum("v3")]).collect()
ans = x.groupby("id6").agg([pl.sum("v1").alias("v1_sum"), pl.sum("v2").alias("v2_sum"), pl.sum("v3").alias("v3_sum")]).collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -224,7 +224,7 @@
question = "largest two v3 by id6" # q8
gc.collect()
t_start = timeit.default_timer()
ans = x.drop_nulls("v3").sort("v3", reverse=True).groupby("id6").agg(col("v3").head(2).alias("largest2_v3")).explode("largest2_v3").collect()
ans = x.drop_nulls("v3").groupby("id6").agg(col("v3").top_k(2).alias("largest2_v3")).explode("largest2_v3").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -235,7 +235,7 @@
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.drop_nulls("v3").sort("v3", reverse=True).groupby("id6").agg(col("v3").head(2).alias("largest2_v3")).explode("largest2_v3").collect()
ans = x.drop_nulls("v3").groupby("id6").agg(col("v3").top_k(2).alias("largest2_v3")).explode("largest2_v3").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
@@ -299,6 +299,6 @@
print(ans.tail(3), flush=True)
del ans

print("grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True)
print("grouping finished, took %0.3fs" % (timeit.default_timer() - task_init), flush=True)

exit(0)
99 changes: 66 additions & 33 deletions polars/join-polars.py
Original file line number Diff line number Diff line change
@@ -27,154 +27,187 @@
print("loading datasets " + data_name + ", " + y_data_name[0] + ", " + y_data_name[2] + ", " + y_data_name[2], flush=True)

with pl.StringCache():
x = pl.read_csv(src_jn_x, dtype={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64})
x["id4"] = x["id4"].cast(pl.Categorical)
x["id5"] = x["id5"].cast(pl.Categorical)
x["id6"] = x["id6"].cast(pl.Categorical)
small = pl.read_csv(src_jn_y[0], dtype={"id1":pl.Int32, "v2":pl.Float64})
small["id4"] = small["id4"].cast(pl.Categorical)
medium = pl.read_csv(src_jn_y[1], dtype={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64})
medium["id4"] = medium["id4"].cast(pl.Categorical)
medium["id5"] = medium["id5"].cast(pl.Categorical)
big = pl.read_csv(src_jn_y[2], dtype={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64})
big["id4"] = big["id4"].cast(pl.Categorical)
big["id5"] = big["id5"].cast(pl.Categorical)
big["id6"] = big["id6"].cast(pl.Categorical)
x = (pl.read_csv(src_jn_x, dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v1":pl.Float64})
.with_columns(
pl.col(["id4", "id5", "id6"]).cast(pl.Categorical)
)
)
small = pl.read_csv(src_jn_y[0], dtypes={"id1":pl.Int32, "v2":pl.Float64})
small = small.with_columns(
pl.col("id4").cast(pl.Categorical)
)
medium = (pl.read_csv(src_jn_y[1], dtypes={"id1":pl.Int32, "id2":pl.Int32, "v2":pl.Float64})
.with_columns(
pl.col(["id4", "id5"]).cast(pl.Categorical),
))
big = (pl.read_csv(src_jn_y[2], dtypes={"id1":pl.Int32, "id2":pl.Int32, "id3":pl.Int32, "v2":pl.Float64})
.with_columns(
pl.col(["id4", "id5", "id6"]).cast(pl.Categorical)
))

print(len(x), flush=True)
print(len(small), flush=True)
print(len(medium), flush=True)
print(len(big), flush=True)

with pl.StringCache():
x.write_ipc("/tmp/x.ipc")
del x
x = pl.read_ipc("/tmp/x.ipc", memory_map=True)
x = x.lazy()

small.write_ipc("/tmp/small.ipc")
del small
small = pl.read_ipc("/tmp/small.ipc", memory_map=True)
small = small.lazy()

medium.write_ipc("/tmp/medium.ipc")
del medium
medium = pl.read_ipc("/tmp/medium.ipc", memory_map=True)
medium = medium.lazy()

big.write_ipc("/tmp/big.ipc")
del big
big = pl.read_ipc("/tmp/big.ipc", memory_map=True)
big = big.lazy()

# materialize
print(len(x.collect()), flush=True)
print(len(small.collect()), flush=True)
print(len(medium.collect()), flush=True)
print(len(big.collect()), flush=True)

in_rows = x.collect().shape[0]

task_init = timeit.default_timer()
print("joining...", flush=True)

question = "small inner on int" # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.join(small, on="id1")
ans = x.join(small, on="id1").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(small, on="id1")
ans = x.join(small, on="id1").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "medium inner on int" # q2
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, on="id2")
ans = x.join(medium, on="id2").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, on="id2")
ans = x.join(medium, on="id2").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "medium outer on int" # q3
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, how="left", on="id2")
ans = x.join(medium, how="left", on="id2").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, how="left", on="id2")
ans = x.join(medium, how="left", on="id2").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "medium inner on factor" # q4
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, on="id5")
ans = x.join(medium, on="id5").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, on="id5")
ans = x.join(medium, on="id5").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "big inner on int" # q5
gc.collect()
t_start = timeit.default_timer()
ans = x.join(big, on="id3")
ans = x.join(big, on="id3").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(big, on="id3")
ans = x.join(big, on="id3").collect()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans["v1"].sum(), ans["v2"].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans