-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfruit-processed.ts
181 lines (164 loc) · 4.16 KB
/
fruit-processed.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import { map, reduce, consolidate } from '@electric-sql/d2ts'
import { Store } from '@electric-sql/d2ts/store'
type FruitOrder = {
name: string
quantity: number
shipping_id: string
status: 'packed' | 'shipped' | 'delivered'
}
const fruitOrders = new Store<string, FruitOrder>()
const fruitOrders2 = new Store<string, { name: string; quantity: number }>()
const [{ materializedStatus, materializedProcessed }, _unsubscribe] =
Store.pipeAll(
{ fruitOrders, fruitOrders2 },
({ fruitOrders, fruitOrders2 }) => {
const statusStream = fruitOrders.pipe(
// debug('Raw Input'),
map(
([orderId, order]) =>
[`${order.name}-${order.status}`, order.quantity] as [
string,
number,
],
),
// debug('After Map'),
reduce((values) => {
// The reduce function receives an array of [quantity, diff] for each key
// `diff` being the change in number of occurrences of the specific quantity
// It is not aware of the key, just that everything it is receiving is for the same key
// Here we want to sum the quantity for each key, so a sum of num * diff
let count = 0
for (const [num, diff] of values) {
count += num * diff
}
return [[count, 1]]
}),
// debug('Status Totals'),
consolidate(),
)
const processedStream = fruitOrders.pipe(
// debug('Raw Input'),
map(
([orderId, order]) =>
[order.name, order.quantity] as [string, number],
),
// debug('After Map'),
reduce((values) => {
// Count the total number of each fruit processed
let count = 0
for (const [num, diff] of values) {
count += num * diff
}
return [[count, 1]]
}),
// debug('Total Processed'),
consolidate(),
)
const materializedStatus = Store.materialize(statusStream)
const materializedProcessed = Store.materialize(processedStream)
return { materializedStatus, materializedProcessed }
},
)
function showStatus() {
const obj = Object.fromEntries(materializedStatus.entries())
console.log('Counts by Status:')
console.log(JSON.stringify(obj, null, 2))
}
function showProcessed() {
const obj = Object.fromEntries(materializedProcessed.entries())
console.log('Fruit Processed:')
console.log(JSON.stringify(obj, null, 2))
}
console.log('--------------------------------')
// Initial packing of orders
console.log('Sending initial orders')
fruitOrders.transaction((tx) => {
tx.set('A001', {
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'packed',
})
tx.set('B001', {
name: 'banana',
quantity: 150,
shipping_id: 'B001',
status: 'packed',
})
})
// Show the materialized status and processed totals:
showStatus()
showProcessed()
console.log('--------------------------------')
// Ship 2 orders
console.log('Shipping 2 orders')
fruitOrders.transaction((tx) => {
tx.set('A001', {
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'shipped',
})
tx.set('B001', {
name: 'banana',
quantity: 150,
shipping_id: 'B001',
status: 'shipped',
})
})
showStatus()
showProcessed()
console.log('--------------------------------')
// One order arrives
console.log('One order arrives')
fruitOrders.transaction((tx) => {
tx.set('A001', {
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'delivered',
})
})
showStatus()
showProcessed()
console.log('--------------------------------')
/*
Output:
--------------------------------
Sending initial orders
Counts by Status:
{
"apple-packed": 100,
"banana-packed": 150
}
Fruit Processed:
{
"apple": 100,
"banana": 150
}
--------------------------------
Shipping 2 orders
Counts by Status:
{
"apple-shipped": 100,
"banana-shipped": 150
}
Fruit Processed:
{
"apple": 100,
"banana": 150
}
--------------------------------
One order arrives
Counts by Status:
{
"banana-shipped": 150,
"apple-delivered": 100
}
Fruit Processed:
{
"apple": 100,
"banana": 150
}
--------------------------------
*/