-
Notifications
You must be signed in to change notification settings - Fork 17
/
managers.py
138 lines (92 loc) · 3.94 KB
/
managers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import cv2
import numpy
import time
class CaptureManager(object):
def __init__(self, capture, previewWindowManager = None,
shouldMirrorPreview = False):
self.previewWindowManager =previewWindowManager
self.shouldMirrorPreview = shouldMirrorPreview
self._capture = capture
self._channel = 0
self._enteredFrame = False
self._frame = None
self._imageFilename = None
self._videoFilename = None
self._videoEncoding = None
self._videoWriter = None
self._startTime = None
self._framesElapsed = 0
self._fpsEstimate = None
@property
def channel(self):
return self._channel
@channel.setter
def channel(self,value):
if self._channel != value:
self._channel = value
self._frame = None
@property
def frame(self):
if self._enteredFrame and self._frame is None:
_, self._frame = self._capture.retrieve(self._frame, self.channel)
return self._frame
@property
def isWritingImage(self):
return self._imageFilename is not None
@property
def isWritingVideo(self):
return self._videoFilename is not None
def enterFrame(self):
assert not self._enteredFrame, \
'previous enterFrame() had no matching exitFrame()'
if self._capture is not None:
self._enteredFrame = self._capture.grab()
def exitFrame(self):
if self.frame is None:
self._enteredFrame = False
return
if self._framesElapsed == 0:
self._startTime = time.time()
else:
timeElapsed = time.time() - self._startTime
self._fpsEstimate = self._framesElapsed / timeElapsed
self._framesElapsed += 1
if self.previewWindowManager is not None:
if self.shouldMirrorPreview:
mirroredFrame = numpy.fliplr(self._frame)
self.previewWindowManager.show(mirroredFrame)
else:
self.previewWindowManager.show(self._frame)
if self.isWritingImage:
cv2.imwrite(self._imageFilename, self._frame)
self._imageFilename= None
self._writeVideoFrame()
self._frame = None
self._enteredFrame = False
def writeImage(self,filename):
"""Write the next exited frame to an image file."""
self._imageFilename = filename
def startWritingVideo(self, filename,encoding = cv2.VideoWriter_fourcc('M','J','P','G')):
"""Start writing exited frames to a video file."""
self._videoFilename = filename
self._videoEncoding = encoding
def stopWritingVideo(self):
"""Stop writing exited frames to a video file."""
self._videoFilename = None
self._videoEncoding = None
self._videoWriter = None
def _writeVideoFrame(self):
if not self.isWritingVideo:
return
if not self.isWritingVideo:
fps = self._capture.get(cv2.CAP_PROP_FPS)
if fps <= 0.0:
# The capture's FPS is unknown so use an estimate.
if self._framesElapsed < 20:
return
else:
fps = self._fpsEstimate
size = (int(self._capture.get( cv2.CAP_PROP_FRAME_WIDTH)),
int(self._capture.get( cv2.CAP_PROP_FRAME_HEIGHT)))
self._videoWriter = cv2.VideoWriter( self._videoFilename, self._videoEncoding, fps, size)
self._videoWriter.write(self._frame)