-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnearest_k_points.py
More file actions
114 lines (84 loc) · 2.72 KB
/
nearest_k_points.py
File metadata and controls
114 lines (84 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
This problem was asked by LinkedIn.
Given a list of points, a central point, and an integer k, find the nearest k points from the central point.
For example, given the list of points [(0, 0), (5, 4), (3, 1)], the central point (1, 2), and k = 2, return [(0, 0), (3, 1)].
"""
import math
class PriorityQueue:
def __init__(self):
self.heap = []
def push(self, ele, priority):
self.heap.append((priority, ele))
self._bubble_up(len(self.heap) - 1)
def pop(self):
if self.is_empty():
raise IndexError("get from empty heap")
self._swap(0, len(self.heap) - 1)
priority, ele = self._pop()
self._bubble_down(0)
return (ele, priority)
def peek(self):
priority, ele = self.heap[0]
return (ele, priority)
def is_empty(self):
return not self.heap
def _bubble_down(self, ind):
length = len(self.heap)
heap = self.heap
while True:
lc, rc = self._left_child(ind), self._right_child(ind)
if lc >= length and rc >= length:
break
elif lc >= length:
replace = rc
elif rc >= length:
replace = lc
else:
replace = min(lc, rc, key=lambda i: self.heap[i])
if heap[replace] > heap[ind]:
self._swap(ind, replace)
ind = replace
else:
break
def _bubble_up(self, ind):
par = self._parent(ind)
heap = self.heap
while par >= 0:
if heap[ind] > heap[par]:
self._swap(ind, par)
ind = par
par = self._parent(ind)
else:
break
def _parent(self, ind):
return (ind - 1) // 2
def _left_child(self, ind):
return (ind * 2) + 1
def _right_child(self, ind):
return (ind * 2) + 2
def __len__(self):
return len(self.heap)
def _swap(self, i, j):
_, item0 = self.heap[i]
_, item1 = self.heap[j]
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
def _pop(self):
priority, ele = self.heap.pop()
return (priority, ele)
def nearest_k_points(points, center, k):
if len(points) <= k:
return points
pq = PriorityQueue()
for point in points:
if len(pq) < k:
pq.push(point, distance(point, center))
else:
dist = distance(point, center)
if dist < pq.peek()[1]:
pq.pop()
pq.push(point, dist)
return [pq.pop()[0] for i in range(k)]
def distance(p1, p2):
x1, y1 = p1
x2, y2 = p2
return math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)