Skip to content

Commit 80134dd

Browse files
Merge pull request #34 from marmelasoft/ne/#33-unsubscribe
feat: add unsubscribe function
2 parents a1ea7ec + 89739c1 commit 80134dd

File tree

3 files changed

+256
-0
lines changed

3 files changed

+256
-0
lines changed

lib/ecto_watch.ex

+25
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,31 @@ defmodule EctoWatch do
189189
end
190190
end
191191

192+
@doc """
193+
Unsubscribe to notifications from watchers that you previously subscribe. It
194+
receives the same params for `subscribe/2`.
195+
196+
Examples:
197+
198+
iex> EctoWatch.unsubscribe({Comment, :updated})
199+
iex> EctoWatch.unsubscribe({Comment, :updated}, {:post_id, post_id})
200+
"""
201+
@spec unsubscribe(watcher_identifier(), term()) :: :ok | {:error, term()}
202+
def unsubscribe(watcher_identifier, id \\ nil) do
203+
validate_ecto_watch_running!()
204+
205+
with :ok <- validate_identifier(watcher_identifier),
206+
{:ok, {pub_sub_mod, channel_name, debug?}} <-
207+
WatcherServer.pub_sub_subscription_details(watcher_identifier, id) do
208+
if(debug?, do: debug_log(watcher_identifier, "Unsubscribing to watcher"))
209+
210+
Phoenix.PubSub.unsubscribe(pub_sub_mod, channel_name)
211+
else
212+
{:error, error} ->
213+
raise ArgumentError, error
214+
end
215+
end
216+
192217
@doc """
193218
Returns details about a watcher for reflection purposes
194219

lib/ecto_watch/watcher_trigger_validator.ex

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ defmodule EctoWatch.WatcherTriggerValidator do
4040
if MapSet.size(extra_found_triggers) > 0 do
4141
log_extra_triggers(extra_found_triggers)
4242
end
43+
4344
if MapSet.size(extra_found_functions) > 0 do
4445
log_extra_functions(extra_found_functions)
4546
end

test/ecto_watch_test.exs

+230
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,49 @@ defmodule EctoWatchTest do
733733
assert_receive {:things_inserted, %{id: 3}}
734734
assert_receive {{Other, :inserted}, %{weird_id: 1234}}
735735
assert_receive {:other_inserted, %{weird_id: 1234}}
736+
737+
EctoWatch.unsubscribe({Thing, :inserted})
738+
739+
Ecto.Adapters.SQL.query!(
740+
TestRepo,
741+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
742+
[]
743+
)
744+
745+
refute_receive {{Thing, :inserted}, _}
746+
assert_receive {:things_inserted, %{id: 4}}
747+
748+
EctoWatch.unsubscribe(:things_inserted)
749+
750+
Ecto.Adapters.SQL.query!(
751+
TestRepo,
752+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
753+
[]
754+
)
755+
756+
refute_receive {{Thing, :inserted}, _}
757+
refute_receive {:things_inserted, _}
758+
759+
EctoWatch.unsubscribe({Other, :inserted})
760+
761+
Ecto.Adapters.SQL.query!(
762+
TestRepo,
763+
"INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (2345, 'the value', NOW(), NOW())",
764+
[]
765+
)
766+
767+
refute_receive {{Other, :inserted}, %{weird_id: 2345}}
768+
assert_receive {:other_inserted, %{weird_id: 2345}}
769+
770+
EctoWatch.unsubscribe(:other_inserted)
771+
772+
Ecto.Adapters.SQL.query!(
773+
TestRepo,
774+
"INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (3456, 'the value', NOW(), NOW())",
775+
[]
776+
)
777+
778+
refute_receive {:other_inserted, _}
736779
end
737780

738781
test "empty extra_columns list" do
@@ -769,6 +812,28 @@ defmodule EctoWatchTest do
769812
assert_receive {:things_inserted, %{id: 3}}
770813
assert_receive {{Other, :inserted}, %{weird_id: 1234}}
771814
assert_receive {:other_inserted, %{weird_id: 1234}}
815+
816+
EctoWatch.unsubscribe({Thing, :inserted})
817+
EctoWatch.unsubscribe({Other, :inserted})
818+
EctoWatch.unsubscribe(:things_inserted)
819+
EctoWatch.unsubscribe(:other_inserted)
820+
821+
Ecto.Adapters.SQL.query!(
822+
TestRepo,
823+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
824+
[]
825+
)
826+
827+
Ecto.Adapters.SQL.query!(
828+
TestRepo,
829+
"INSERT INTO \"0xabcd\".other (weird_id, the_string, inserted_at, updated_at) VALUES (2345, 'the value', NOW(), NOW())",
830+
[]
831+
)
832+
833+
refute_receive {{Thing, :inserted}, _}
834+
refute_receive {:things_inserted, _}
835+
refute_receive {{Other, :inserted}, _}
836+
refute_receive {:other_inserted, _}
772837
end
773838

774839
test "inserts for an association column", %{already_existing_id2: already_existing_id2} do
@@ -803,6 +868,23 @@ defmodule EctoWatchTest do
803868

804869
assert_receive {:things_parent_id_inserted,
805870
%{id: 3, parent_thing_id: ^already_existing_id2}}
871+
872+
EctoWatch.unsubscribe({Thing, :inserted}, {:parent_thing_id, already_existing_id2})
873+
874+
EctoWatch.unsubscribe(
875+
:things_parent_id_inserted,
876+
{:parent_thing_id, already_existing_id2}
877+
)
878+
879+
Ecto.Adapters.SQL.query!(
880+
TestRepo,
881+
"INSERT INTO things (the_string, the_integer, the_float, parent_thing_id, extra_field, inserted_at, updated_at) VALUES ('the other value', 8900, 24.53, #{already_existing_id2}, 'hey', NOW(), NOW())",
882+
[]
883+
)
884+
885+
refute_receive {{Thing, :inserted}, _}
886+
887+
refute_receive {:things_parent_id_inserted, _}
806888
end
807889

808890
test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do
@@ -920,6 +1002,31 @@ defmodule EctoWatchTest do
9201002

9211003
assert_receive {{Thing, :updated}, %{id: ^already_existing_id2}}
9221004
assert_receive {:things_updated, %{id: ^already_existing_id2}}
1005+
1006+
EctoWatch.unsubscribe({Thing, :updated})
1007+
1008+
Ecto.Adapters.SQL.query!(
1009+
TestRepo,
1010+
"UPDATE things SET the_string = 'the second new value'",
1011+
[]
1012+
)
1013+
1014+
refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}}
1015+
assert_receive {:things_updated, %{id: ^already_existing_id1}}
1016+
1017+
refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}}
1018+
assert_receive {:things_updated, %{id: ^already_existing_id2}}
1019+
1020+
EctoWatch.unsubscribe(:things_updated)
1021+
1022+
Ecto.Adapters.SQL.query!(
1023+
TestRepo,
1024+
"UPDATE things SET the_string = 'the third new value'",
1025+
[]
1026+
)
1027+
1028+
refute_receive {:things_updated, %{id: ^already_existing_id1}}
1029+
refute_receive {:things_updated, %{id: ^already_existing_id2}}
9231030
end
9241031

9251032
test "updates for the primary key", %{
@@ -942,6 +1049,12 @@ defmodule EctoWatchTest do
9421049
assert_receive {{Thing, :updated}, %{id: ^already_existing_id1}}
9431050

9441051
refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}}
1052+
1053+
EctoWatch.unsubscribe({Thing, :updated}, already_existing_id1)
1054+
1055+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'another new value'", [])
1056+
1057+
refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}}
9451058
end
9461059

9471060
test "updates for an association column", %{
@@ -979,6 +1092,28 @@ defmodule EctoWatchTest do
9791092

9801093
assert_receive {:things_parent_id_updated,
9811094
%{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}}
1095+
1096+
EctoWatch.unsubscribe({Thing, :updated}, {:parent_thing_id, already_existing_id1})
1097+
1098+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'another new value'", [])
1099+
1100+
refute_receive {{Thing, :updated}, %{id: ^already_existing_id2}}
1101+
1102+
assert_receive {:things_parent_id_updated,
1103+
%{id: ^already_existing_id2, parent_thing_id: ^already_existing_id1}}
1104+
1105+
EctoWatch.unsubscribe(
1106+
:things_parent_id_updated,
1107+
{:parent_thing_id, already_existing_id1}
1108+
)
1109+
1110+
Ecto.Adapters.SQL.query!(
1111+
TestRepo,
1112+
"UPDATE things SET the_string = 'yet another new value'",
1113+
[]
1114+
)
1115+
1116+
refute_receive {:things_parent_id_updated, %{id: ^already_existing_id2}}
9821117
end
9831118

9841119
test "column is not in list of extra_columns", %{already_existing_id2: already_existing_id2} do
@@ -1048,6 +1183,15 @@ defmodule EctoWatchTest do
10481183

10491184
assert_receive {:thing_custom_event, %{id: ^already_existing_id1}}
10501185
refute_receive {_, %{id: ^already_existing_id2}}
1186+
1187+
EctoWatch.unsubscribe(:thing_custom_event, already_existing_id1)
1188+
1189+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_string = 'the new value'", [])
1190+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_integer = 9998", [])
1191+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_float = 99.899", [])
1192+
1193+
refute_receive {_, %{id: ^already_existing_id1}}
1194+
refute_receive {_, %{id: ^already_existing_id2}}
10511195
end
10521196

10531197
test "extra_columns option", %{
@@ -1093,6 +1237,26 @@ defmodule EctoWatchTest do
10931237
%{id: ^already_existing_id1, the_integer: 9999, the_float: 99.999}}
10941238

10951239
refute_receive {{_, :updated}, %{id: ^already_existing_id2}}
1240+
1241+
EctoWatch.unsubscribe({Thing, :updated}, already_existing_id1)
1242+
1243+
Ecto.Adapters.SQL.query!(
1244+
TestRepo,
1245+
"UPDATE things SET the_string = 'another new value' WHERE id = $1",
1246+
[already_existing_id1]
1247+
)
1248+
1249+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_integer = 9999 WHERE id = $1", [
1250+
already_existing_id1
1251+
])
1252+
1253+
Ecto.Adapters.SQL.query!(TestRepo, "UPDATE things SET the_float = 99.999 WHERE id = $1", [
1254+
already_existing_id1
1255+
])
1256+
1257+
refute_receive {{Thing, :updated}, %{id: ^already_existing_id1}}
1258+
1259+
refute_receive {{_, :updated}, %{id: ^already_existing_id2}}
10961260
end
10971261

10981262
test "no notifications without subscribe", %{
@@ -1131,6 +1295,33 @@ defmodule EctoWatchTest do
11311295

11321296
assert_receive {{Thing, :deleted}, %{id: ^already_existing_id2}}
11331297
assert_receive {:things_deleted, %{id: ^already_existing_id2}}
1298+
1299+
EctoWatch.unsubscribe({Thing, :deleted})
1300+
1301+
Ecto.Adapters.SQL.query!(
1302+
TestRepo,
1303+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
1304+
[]
1305+
)
1306+
1307+
Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])
1308+
1309+
refute_receive {{Thing, :deleted}, _}
1310+
1311+
assert_receive {:things_deleted, %{id: 3}}
1312+
1313+
EctoWatch.unsubscribe(:things_deleted)
1314+
1315+
Ecto.Adapters.SQL.query!(
1316+
TestRepo,
1317+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
1318+
[]
1319+
)
1320+
1321+
Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])
1322+
1323+
refute_receive {{Thing, :deleted}, _}
1324+
refute_receive {:things_deleted, _}
11341325
end
11351326

11361327
test "empty extra_columns", %{
@@ -1156,6 +1347,32 @@ defmodule EctoWatchTest do
11561347

11571348
assert_receive {{Thing, :deleted}, %{id: ^already_existing_id2}}
11581349
assert_receive {:things_deleted, %{id: ^already_existing_id2}}
1350+
1351+
EctoWatch.unsubscribe({Thing, :deleted})
1352+
1353+
Ecto.Adapters.SQL.query!(
1354+
TestRepo,
1355+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
1356+
[]
1357+
)
1358+
1359+
Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])
1360+
1361+
refute_receive {{Thing, :deleted}, _}
1362+
assert_receive {:things_deleted, %{id: 3}}
1363+
1364+
EctoWatch.unsubscribe(:things_deleted)
1365+
1366+
Ecto.Adapters.SQL.query!(
1367+
TestRepo,
1368+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
1369+
[]
1370+
)
1371+
1372+
Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])
1373+
1374+
refute_receive {{Thing, :deleted}, _}
1375+
refute_receive {:things_deleted, _}
11591376
end
11601377

11611378
test "deletes for the primary key", %{
@@ -1178,6 +1395,19 @@ defmodule EctoWatchTest do
11781395
assert_receive {{Thing, :deleted}, %{id: ^already_existing_id1}}
11791396

11801397
refute_receive {{Thing, :deleted}, %{id: ^already_existing_id2}}
1398+
1399+
Ecto.Adapters.SQL.query!(
1400+
TestRepo,
1401+
"INSERT INTO things (the_string, the_integer, the_float, inserted_at, updated_at) VALUES ('the value', 4455, 84.52, NOW(), NOW())",
1402+
[]
1403+
)
1404+
1405+
EctoWatch.subscribe({Thing, :deleted}, 3)
1406+
EctoWatch.unsubscribe({Thing, :deleted}, 3)
1407+
1408+
Ecto.Adapters.SQL.query!(TestRepo, "DELETE FROM things", [])
1409+
1410+
refute_receive {{Thing, :deleted}, _}
11811411
end
11821412

11831413
test "deletes for an association column", %{

0 commit comments

Comments
 (0)