Skip to content

Commit 203f4fc

Browse files
committed
Subscribe and unsubscribe to LSN updates
1 parent 847f0a0 commit 203f4fc

File tree

7 files changed

+52
-48
lines changed

7 files changed

+52
-48
lines changed

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule Electric.Shapes.Consumer do
1717
alias Electric.Shapes.Consumer.Materializer
1818
alias Electric.Shapes.ConsumerRegistry
1919
alias Electric.LogItems
20-
alias Electric.LsnTracker
20+
2121
alias Electric.Postgres.Inspector
2222
alias Electric.Replication.Changes
2323
alias Electric.Replication.Changes.Transaction
@@ -126,7 +126,6 @@ defmodule Electric.Shapes.Consumer do
126126
metadata = [shape_handle: shape_handle, stack_id: stack_id]
127127
Logger.metadata(metadata)
128128
Electric.Telemetry.Sentry.set_tags_context(metadata)
129-
{:ok, _} = LsnTracker.subscribe_to_global_lsn_updates(stack_id)
130129

131130
# Shape initialization will be complete when we receive a message {:initialize_shape,
132131
# <shape>, <shape_opts>} which the ShapeCache is expected to send as soon as this process

packages/sync-service/lib/electric/shapes/consumer/effect.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,14 @@ defmodule Electric.Shapes.Consumer.Effect do
55
@moduledoc false
66
defstruct []
77
end
8+
9+
defmodule SubscribeGlobalLsn do
10+
@moduledoc false
11+
defstruct []
12+
end
13+
14+
defmodule UnsubscribeGlobalLsn do
15+
@moduledoc false
16+
defstruct []
17+
end
818
end

packages/sync-service/lib/electric/shapes/consumer/event_handler/subqueries/buffering.ex

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
2222
:trigger_dep_index,
2323
:move_in_values,
2424
:views_before_move,
25-
:views_after_move,
26-
:latest_seen_lsn
25+
:views_after_move
2726
]
2827
defstruct [
2928
:shape,
@@ -98,7 +97,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
9897
views_before_move: state.views,
9998
views_after_move: views_after,
10099
dependency_handle_to_ref: state.dependency_handle_to_ref,
101-
latest_seen_lsn: state.latest_seen_lsn,
102100
queue: queue,
103101
buffer_max_transactions: state.buffer_max_transactions
104102
}
@@ -239,7 +237,11 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
239237
txns -> txns |> List.last() |> Map.fetch!(:last_log_offset)
240238
end
241239

242-
plan = %Plan{log_ops: log_ops, ack_source_offset: ack_offset}
240+
plan = %Plan{
241+
log_ops: log_ops,
242+
effects: [%Electric.Shapes.Consumer.Effect.UnsubscribeGlobalLsn{}],
243+
ack_source_offset: ack_offset
244+
}
243245

244246
# Transition back to steady state, then drain any queued moves
245247
state
@@ -280,7 +282,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
280282
dnf_plan: state.dnf_plan,
281283
views: state.views_after_move,
282284
dependency_handle_to_ref: state.dependency_handle_to_ref,
283-
latest_seen_lsn: state.latest_seen_lsn,
284285
queue: state.queue,
285286
buffer_max_transactions: state.buffer_max_transactions
286287
}

packages/sync-service/lib/electric/shapes/consumer/event_handler/subqueries/steady.ex

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
1919
:dnf_plan,
2020
views: %{},
2121
dependency_handle_to_ref: %{},
22-
latest_seen_lsn: nil,
2322
queue: MoveQueue.new(),
2423
buffer_max_transactions: 1000
2524
]
@@ -31,7 +30,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
3130
dnf_plan: DnfPlan.t(),
3231
views: %{[String.t()] => MapSet.t()},
3332
dependency_handle_to_ref: %{String.t() => {non_neg_integer(), [String.t()]}},
34-
latest_seen_lsn: Electric.Postgres.Lsn.t() | nil,
3533
queue: MoveQueue.t(),
3634
buffer_max_transactions: pos_integer()
3735
}
@@ -55,8 +53,9 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
5553
end
5654
end
5755

58-
def handle_event(state, {:global_last_seen_lsn, lsn}) do
59-
{:ok, %{state | latest_seen_lsn: Subqueries.normalize_global_lsn(lsn)}, %Plan{}}
56+
def handle_event(state, {:global_last_seen_lsn, _lsn}) do
57+
# Straggler message after unsubscribe; ignore.
58+
{:ok, state, %Plan{}}
6059
end
6160

6261
def handle_event(state, {:materializer_changes, dep_handle, payload}) do
@@ -126,7 +125,12 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
126125

127126
plan = %{
128127
plan
129-
| effects: plan.effects ++ [%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}]
128+
| effects:
129+
plan.effects ++
130+
[
131+
%Electric.Shapes.Consumer.Effect.SubscribeGlobalLsn{},
132+
%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}
133+
]
130134
}
131135

132136
{:ok, buffering, plan}
@@ -150,7 +154,12 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
150154

151155
plan = %{
152156
plan
153-
| effects: plan.effects ++ [%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}]
157+
| effects:
158+
plan.effects ++
159+
[
160+
%Electric.Shapes.Consumer.Effect.SubscribeGlobalLsn{},
161+
%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}
162+
]
154163
}
155164

156165
{:ok, buffering, plan}

packages/sync-service/lib/electric/shapes/consumer/plan.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ defmodule Electric.Shapes.Consumer.Plan do
1616

1717
@type effect() ::
1818
%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}
19+
| %Electric.Shapes.Consumer.Effect.SubscribeGlobalLsn{}
20+
| %Electric.Shapes.Consumer.Effect.UnsubscribeGlobalLsn{}
1921
end

packages/sync-service/lib/electric/shapes/consumer/plan_executor.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ defmodule Electric.Shapes.Consumer.PlanExecutor do
123123
acc
124124
end
125125

126+
defp execute_effect(%Effect.SubscribeGlobalLsn{}, acc) do
127+
{:ok, _} = Electric.LsnTracker.subscribe_to_global_lsn_updates(acc.state.stack_id)
128+
acc
129+
end
130+
131+
defp execute_effect(%Effect.UnsubscribeGlobalLsn{}, acc) do
132+
:ok = Electric.LsnTracker.unsubscribe_from_global_lsn_updates(acc.state.stack_id)
133+
acc
134+
end
135+
126136
# -- Ack --
127137

128138
defp apply_ack(acc, nil), do: acc

packages/sync-service/test/electric/shapes/consumer/event_handler_test.exs

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ defmodule Electric.Shapes.Consumer.EventHandlerTest do
7373
{:materializer_changes, dep_handle, %{move_in: [], move_out: [{1, "1"}]}}
7474
)
7575

76-
assert %Plan{effects: [%Effect.StartMoveInQuery{}]} = plan
76+
assert %Plan{effects: [%Effect.SubscribeGlobalLsn{}, %Effect.StartMoveInQuery{}]} = plan
7777

7878
assert %Buffering{
7979
views_before_move: %{["$sublink", "0"] => before_view},
@@ -109,7 +109,8 @@ defmodule Electric.Shapes.Consumer.EventHandlerTest do
109109
handler = new_handler()
110110
dep_handle = dep_handle(handler)
111111

112-
assert {:ok, %Buffering{} = handler, %Plan{effects: [%Effect.StartMoveInQuery{}]}} =
112+
assert {:ok, %Buffering{} = handler,
113+
%Plan{effects: [%Effect.SubscribeGlobalLsn{}, %Effect.StartMoveInQuery{}]}} =
113114
EventHandler.handle_event(
114115
handler,
115116
{:materializer_changes, dep_handle, %{move_in: [{1, "1"}], move_out: []}}
@@ -326,38 +327,6 @@ defmodule Electric.Shapes.Consumer.EventHandlerTest do
326327
} = plan
327328
end
328329

329-
test "uses an lsn update that was already seen before the move-in started" do
330-
handler = new_handler()
331-
dep_handle = dep_handle(handler)
332-
333-
assert {:ok, handler, %Plan{}} =
334-
EventHandler.handle_event(handler, global_last_seen_lsn(20))
335-
336-
assert {:ok, %Buffering{} = handler, _plan} =
337-
EventHandler.handle_event(
338-
handler,
339-
{:materializer_changes, dep_handle, %{move_in: [{1, "1"}], move_out: []}}
340-
)
341-
342-
assert {:ok, %Buffering{} = handler, _plan} =
343-
EventHandler.handle_event(handler, {:pg_snapshot_known, {100, 300, []}})
344-
345-
assert {:ok, %Steady{views: views}, plan} =
346-
EventHandler.handle_event(
347-
handler,
348-
{:query_move_in_complete, [child_insert("99", "1")], lsn(20)}
349-
)
350-
351-
assert views[["$sublink", "0"]] == MapSet.new([1])
352-
353-
assert %Plan{
354-
log_ops: [
355-
%LogOp.AppendControl{message: %{headers: %{event: "move-in"}}},
356-
%LogOp.AppendMoveInSnapshot{rows: [%Changes.NewRecord{record: %{"id" => "99"}}]}
357-
]
358-
} = plan
359-
end
360-
361330
test "defers queued move outs until after splice and starts the next move in" do
362331
handler = new_handler()
363332
dep_handle = dep_handle(handler)
@@ -403,7 +372,11 @@ defmodule Electric.Shapes.Consumer.EventHandlerTest do
403372
message: %{headers: %{event: "move-out", patterns: [%{pos: 0}]}}
404373
}
405374
],
406-
effects: [%Effect.StartMoveInQuery{}]
375+
effects: [
376+
%Effect.UnsubscribeGlobalLsn{},
377+
%Effect.SubscribeGlobalLsn{},
378+
%Effect.StartMoveInQuery{}
379+
]
407380
} = plan
408381
end
409382

0 commit comments

Comments
 (0)