diff --git a/src/app/app.py b/src/app/app.py index 153b18c..c2f848b 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -1,6 +1,9 @@ from flask import Flask, render_template, Response, request import requests from importlib import import_module +import io +import base64 +import queue import camera_opencv import webbrowser @@ -20,6 +23,8 @@ from torch import nn import transforms as t import matplotlib.pyplot as plt +from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas +from matplotlib.figure import Figure import json import time import jsonify @@ -31,12 +36,14 @@ mp_holistic = mp.solutions.holistic -label_dict = pd.read_csv('jester-v1-labels.csv', header=None) -ges = label_dict[0].tolist() +with open("jester-v1-labels.txt", "r") as fh: + gesture_labels = fh.read().splitlines() camera = cv2.VideoCapture(0) camera.set(cv2.CAP_PROP_FPS, 48) +confidence_queue = queue.Queue(maxsize=10) + app = Flask(__name__) @app.route('/get_model_selected', methods=['POST']) @@ -86,9 +93,22 @@ def gen(camera): b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') +import collections +import time + +class FPS: + def __init__(self, avarageof=50): + self.frametimestamps = collections.deque(maxlen=avarageof) + def __call__(self): + self.frametimestamps.append(time.time()) + if(len(self.frametimestamps) > 1): + return round(len(self.frametimestamps)/(self.frametimestamps[-1]-self.frametimestamps[0]), 2) + else: + return 0.0 + def Demo_Model_1_20BNJester_gen(camera): """Video streaming generator function for Demo_Model_1_20BNJester.""" - fig, ax = plt.subplots() + # fig, ax = plt.subplots() # Set up some storage variables seq_len = 16 value = 0 @@ -115,7 +135,6 @@ def Demo_Model_1_20BNJester_gen(camera): hist = [] mean_hist = [] setup = True - # plt.ion() cooldown = 0 eval_samples = 2 @@ -123,9 +142,13 @@ def Demo_Model_1_20BNJester_gen(camera): score_energy = torch.zeros((eval_samples, num_classes)) + fps_a = FPS() + fps_d = FPS() + while True: success, frame = camera.read() cv2.flip(frame, 1, frame) + # print(f"fps_all: {fps_a()}") if not success: break @@ -142,14 +165,23 @@ def Demo_Model_1_20BNJester_gen(camera): # Get model output prediction if len(imgs) == 16: + + # print(f"detection_iter_per_sec: {fps_d()}") + data = torch.cat(imgs).cuda() output = model(data.unsqueeze(0)) out = (torch.nn.Softmax(dim=1)(output).data).cpu().numpy()[0] if len(hist) > 300: mean_hist = mean_hist[1:] hist = hist[1:] + + # this is straight cheating. out[-2:] = [0,0] + # Softmax should sum to 1. + print(sum(out)) + hist.append(out) + score_energy = torch.tensor(hist[-eval_samples:]) curr_mean = torch.mean(score_energy, dim=0) mean_hist.append(curr_mean.cpu().numpy()) @@ -160,32 +192,29 @@ def Demo_Model_1_20BNJester_gen(camera): if cooldown > 0: cooldown = cooldown - 1 if value.item() > 0.6 and indices < 25 and cooldown == 0: - print('Gesture:', ges[indices], '\t\t\t\t\t\t Value: {:.2f}'.format(value.item())) + print('Gesture:', gesture_labels[indices], '\t\t\t\t\t\t Value: {:.2f}'.format(value.item())) cooldown = 16 pred = indices imgs = imgs[1:] - df = pd.DataFrame(mean_hist, columns=ges) - - # ax.clear() - # df.plot.line(legend=False, figsize=(16,6),ax=ax, ylim=(0,1)) - # if setup: - # plt.show(block = False) - # setup=False - # plt.draw() + # send predictions to plotting thread + try: + confidence_queue.put_nowait(out) + except queue.Full as e: + print("WARNING: gesture scores filled output queue Filled") + pass n += 1 bg = np.full((480, 640, 3), 15, np.uint8) bg[:480, :640] = frame - font = cv2.FONT_HERSHEY_SIMPLEX - if value > 0.6: - cv2.putText(bg, ges[pred],(20,465), font, 1,(0,255,0),2) - cv2.rectangle(bg,(128,48),(640-128,480-48),(0,255,0),3) - for i, top in enumerate(top_3): - cv2.putText(bg, ges[top],(40,200-70*i), font, 1,(255,255,255),1) - cv2.rectangle(bg,(400,225-70*i),(int(400+out[top]*170),205-70*i),(255,255,255),3) - + # font = cv2.FONT_HERSHEY_SIMPLEX + # if value > 0.6: + # cv2.putText(bg, ges[pred],(20,465), font, 1,(0,255,0),2) + # cv2.rectangle(bg,(128,48),(640-128,480-48),(0,255,0),3) + # for i, top in enumerate(top_3): + # cv2.putText(bg, ges[top],(40,200-70*i), font, 1,(255,255,255),1) + # cv2.rectangle(bg,(400,225-70*i),(int(400+out[top]*170),205-70*i),(255,255,255),3) ret, buffer = cv2.imencode('.jpg', bg) frame = buffer.tobytes() @@ -194,6 +223,59 @@ def Demo_Model_1_20BNJester_gen(camera): b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') +# TODO: handle multiple sets of labels (currently just Jester) +def plot_png(): + + confidence_thresh = 0.6 + + pos = range(len(gesture_labels)) + + # create figure object, we don't use the matplotlib GUI + # so use the base figure class + fig = Figure(figsize=(8,4)) + ax = fig.add_subplot(1, 1, 1) + bars = ax.bar(pos, np.zeros(len(gesture_labels)), align="center") + ax.set_ylim(0, 1) + ax.set_xticks(pos) + ax.set_xticklabels(gesture_labels, rotation=60, ha='right') + ax.set_xlabel("Jester gesture classes") + ax.set_ylabel("confidence") + fig.tight_layout() + + while True: + + try: + # read data from queue + result = confidence_queue.get(timeout=0.2) + + # update the height for each bar + for rect, y in zip(bars, result): + if y > confidence_thresh: + rect.set_color("g") + else: + rect.set_color("b") + rect.set_height(y) + + except: # no data has been returned, detection is off + pass + # print("WARNING: no results returned") + + finally: + # write figure image to io buffer + io_buffer = io.BytesIO() + FigureCanvas(fig).print_png(io_buffer) + io_buffer.seek(0) + + # pass bytes to webpage + yield (b'--frame\r\n' + b'Content-Type: image/png\r\n\r\n' + io_buffer.read() + b'\r\n') + + +@app.route('/accuracy_plot') +def call_plot(): + return Response(plot_png(), + mimetype='multipart/x-mixed-replace; boundary=frame') + @app.route('/Demo_Model_1_20BNJester_video_feed') def Demo_Model_1_20BNJester_video_feed(): """Video streaming route. Put this in the src attribute of an img tag.""" @@ -204,6 +286,7 @@ def Demo_Model_1_20BNJester_video_feed(): @app.route('/video_feed') def video_feed(): """Video streaming route. Put this in the src attribute of an img tag.""" + plot_png() return Response(gen(camera), mimetype='multipart/x-mixed-replace; boundary=frame') diff --git a/src/app/jester-v1-labels.csv b/src/app/jester-v1-labels.txt similarity index 100% rename from src/app/jester-v1-labels.csv rename to src/app/jester-v1-labels.txt diff --git a/src/app/templates/index.html b/src/app/templates/index.html index 018183e..fb0fde1 100644 --- a/src/app/templates/index.html +++ b/src/app/templates/index.html @@ -72,6 +72,10 @@

OFF

+ +
+ gesture classification accuracy +
diff --git a/test_newbackend/DemoModel.py b/test_newbackend/DemoModel.py new file mode 100644 index 0000000..63c5516 --- /dev/null +++ b/test_newbackend/DemoModel.py @@ -0,0 +1,122 @@ +# Source https://github.com/fabiopk/RT_GestureRecognition/blob/master/demo.py + +import torch +import torch.nn as nn +import math + +class FullModel(nn.Module): + + def __init__(self, batch_size, seq_lenght=8): + super(FullModel, self).__init__() + + class CNN2D(nn.Module): + def __init__(self, batch_size=batch_size, image_size=96, seq_lenght=8, in_channels=3): + super(CNN2D, self).__init__() + self.conv1 = self._create_conv_layer(in_channels=in_channels, out_channels=16) + self.conv2 = self._create_conv_layer(in_channels=16, out_channels=32) + self.conv3 = self._create_conv_layer_pool(in_channels=32, out_channels=64) + self.conv4 = self._create_conv_layer_pool(in_channels=64, out_channels=128) + self.conv5 = self._create_conv_layer_pool(in_channels=128, out_channels=256) + cnn_output_shape = int(256*(image_size/(2**4))**2) + + def forward(self, x): + batch_size, frames, channels, width, height = x.shape + x = x.view(-1, channels, width, height) + x = self.conv1(x) + x = self.conv2(x) + x = self.conv3(x) + x = self.conv4(x) + x = self.conv5(x) + return x + + def _create_conv_layer(self,in_channels, out_channels, kernel_size=(3,3), padding=(1,1)): + return nn.Sequential( + nn.Conv2d(in_channels,out_channels, kernel_size, padding=padding), + nn.BatchNorm2d(out_channels), + nn.ReLU(), + ) + + def _create_conv_layer_pool(self,in_channels, out_channels, kernel_size=(3,3), padding=(1,1), pool=(2,2)): + return nn.Sequential( + nn.Conv2d(in_channels,out_channels, kernel_size, padding=padding), + nn.BatchNorm2d(out_channels), + nn.ReLU(), + nn.MaxPool2d(pool) + ) + + class CNN3D(nn.Module): + def __init__(self, batch_size=batch_size, image_size=96, seq_lenght=8): + super(CNN3D, self).__init__() + self.conv1 = self._create_conv_layer_pool(in_channels=256, out_channels=256, pool=(1,1,1)) + self.conv2 = self._create_conv_layer_pool(in_channels=256, out_channels=256, pool=(2,2,2)) + self.conv3 = self._create_conv_layer_pool(in_channels=256, out_channels=256, pool=(2,1,1)) + self.conv4 = self._create_conv_layer_pool(in_channels=256, out_channels=256, pool=(2,2,2)) + + def forward(self, x): + batch_size, channels, frames, width, height = x.shape + x = self.conv1(x) + x = self.conv2(x) + x = self.conv3(x) + x = self.conv4(x) + return x + + def _create_conv_layer(self,in_channels, out_channels, kernel_size=(3,3,3), padding=(1,1,1)): + return nn.Sequential( + nn.Conv3d(in_channels,out_channels, kernel_size, padding=padding), + nn.BatchNorm3d(out_channels), + nn.ReLU(), + ) + + def _create_conv_layer_pool(self,in_channels, out_channels, kernel_size=(3,3,3), padding=(1,1,1), pool=(1,2,2)): + return nn.Sequential( + nn.Conv3d(in_channels,out_channels, kernel_size, padding=padding), + nn.BatchNorm3d(out_channels), + nn.ReLU(), + nn.MaxPool3d(pool) + ) + + + class Combiner(nn.Module): + + def __init__(self, in_features): + super(Combiner, self).__init__() + self.linear1 = self._create_linear_layer(in_features , in_features//2) + self.linear2 = self._create_linear_layer(in_features//2 , 1024) + self.linear3 = self._create_linear_layer(1024 , 27) + + def forward(self, x): + x = self.linear1(x) + x = self.linear2(x) + x = self.linear3(x) + return x; + + def _create_linear_layer(self, in_features, out_features, p=0.6): + return nn.Sequential( + nn.Linear(in_features, out_features), + nn.Dropout(p=p) + ) + + self.rgb2d = CNN2D(batch_size) + self.rgb3d = CNN3D(batch_size) + self.combiner = Combiner(4608) + + self.batch_size = batch_size + self.seq_lenght = seq_lenght + self.steps = 0 + self.steps = 0 + self.epochs = 0 + self.best_valdiation_loss = math.inf + + def forward(self, x): + self.batch_size = x.shape[0] + x = self.rgb2d(x) + batch_and_frames, channels, dim1, dim2 = x.shape + x = x.view(self.batch_size, -1, channels, dim1, dim2).permute(0,2,1,3,4) + x = self.rgb3d(x) + x = x.view(self.batch_size, -1) + x = self.combiner(x) + + if self.training: + self.steps += 1 + + return x \ No newline at end of file diff --git a/test_newbackend/README.md b/test_newbackend/README.md new file mode 100644 index 0000000..6783e5b --- /dev/null +++ b/test_newbackend/README.md @@ -0,0 +1,30 @@ +# New Backend +This new backend relies and a shared memory buffer to separate the +capturing and storing of image sequences from a model implementation. + +### usage +`python test_newbackend/main.py` +> tested on: debian bullseye, python v3.9.2, ROCm stack v4.3.0 + +At the moment no real user interface. +This demo uses opencv windows to display chart and most recent image frame. +Runs for 2 minutes then kills self, `ctrl+c` should kill early. + +### TODOs +- [ ] integrate with GUI + - [ ] `@ianzur`: expected it to be possible to use with a flask backend similar to [celery](), but did not investigate implementing. + > for a web app it may make more sense to move towards a java implementation +- [ ] instead of hacking in the changes into ringbuffer, subclass + +**Notes:** +- 2 files in this folder are directly copied from `./src/app/` + - model structure definition: `DemoModel.py` + - model weights: `demo.ckp` +- RingBuffer implementation: see: https://github.com/ctrl-labs/cringbuffer + - Changes: + - writer allowed to overwrite entries before they are read by the model class. + > This allows for readers to always have the newest frame. (in the case of slow model execution, camera fps remains constant) + - reader pointers ignored, does not track where the readers are. Reader always reads `n`-most recent frames. Writer position is used to locate the most recent frame. + +**contact** +- questions, concerns? raise an issue and `@ianzur` or send me an email `ian dot zurutuza at gmail dot com` diff --git a/test_newbackend/demo.ckp b/test_newbackend/demo.ckp new file mode 100644 index 0000000..a8a3cab Binary files /dev/null and b/test_newbackend/demo.ckp differ diff --git a/test_newbackend/main.py b/test_newbackend/main.py new file mode 100644 index 0000000..efb87fc --- /dev/null +++ b/test_newbackend/main.py @@ -0,0 +1,287 @@ +"""testing new backend + +backend demo, with no real frontend. Runs for 2 mins, then kills self. + +Question, concerns? raise an issue and `@ianzur` or email ian dot zurutuza at gmail dot com +""" +import ctypes +import multiprocessing +import time +import logging +import queue + +import cv2 +import numpy as np + +# matplotlib backend (opencv windows to display webcam and graph) +from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas +from matplotlib.figure import Figure + +# current model implementation uses torch +# TODO: only use "cuda" if available +import torch +import torchvision.transforms + +# model definition (torch nn.Module) +from DemoModel import FullModel + +# from utils.image_generator import imgGenerator +from ringbuffer import RingBuffer +from utils.fps import FPS + +# define size of images to be stored in buffer +IMG_WIDTH = 640 +IMG_HEIGHT = 480 +IMG_CHANNELS = 3 + +# RECORDING_FREQ = 0.060 # ms; ~15 fps +RECORDING_FREQ = 0.030 # ms; ~30 fps + +class Frame(ctypes.Structure): + """c struct for representing frame and timestamp""" + _fields_ = [ + ("timestamp_us", ctypes.c_ulonglong), + ("frame", ctypes.c_ubyte * IMG_CHANNELS * IMG_WIDTH * IMG_HEIGHT) + ] + +def camera_proc(ring_buffer): + + cap = cv2.VideoCapture(0) + + # i = 0 + cam_fps = FPS() + + img = np.empty((IMG_WIDTH, IMG_HEIGHT, IMG_CHANNELS), dtype=np.uint8) + + while True: + + # see utils/image_generator + # img = next(img_gen) + time_s = time.time() + + # opencv doesn't respond to cap.set(cv2.CAP_PROP_FPS, 30) + # so, instead manually force recording frequency. + # grab frame from buffer until expected time has elapsed + while True: + ret = cap.grab() + if (time.time() - time_s) > RECORDING_FREQ: + break + + # read "grabbed" image + ret, img = cap.retrieve() + if not ret: + break + + # resize to expected, convert to RGB + img = cv2.resize(img, (IMG_WIDTH, IMG_HEIGHT)) + img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + + # create struct + frame = Frame(int(time_s * 10e6), np.ctypeslib.as_ctypes(img)) + + # write struct + ring_buffer.write(frame) + + # logger.info(f"camera: {cam_fps():.2f} fps") + + # if i and i % 100 == 0: + # print('Wrote %d so far' % i) + + # i += 1 + + ring_buffer.writer_done() + logger.info('capture proc exited') + + +def model_reader(ring_buffer, n, confidence_queue): + + logger.info("initializing model") + t = time.time() + + model = FullModel(batch_size=1, seq_lenght=16) + loaded_dict = torch.load('test_newbackend/demo.ckp') + model.load_state_dict(loaded_dict) + model = model.cuda() + model.eval() + + # call model on dummy data to build graph + model(torch.zeros((1, 16, 3, 96, 96), dtype=torch.float32).cuda()) + + std, mean = [0.2674, 0.2676, 0.2648], [ 0.6, 0.6, 0.4] + transform = torchvision.transforms.Compose([ + torchvision.transforms.transforms.Resize((120,160)), + torchvision.transforms.transforms.CenterCrop((96, 96)), + torchvision.transforms.transforms.Normalize(std=std, mean=mean), + ]) + + model_fps = FPS() + + logger.info(f"initialization finished, elapsed time: {time.time() - t:.2f} seconds") + logger.info("beginning detection") + while True: + + try: + data = ring_buffer.blocking_read(None, length=n) + except ring_buffer.WriterFinishedError: + break + + # structured array + tp = np.dtype(Frame) + arr = np.frombuffer(data, dtype=tp) + + # accessing structured numpy array + timestamps = arr["timestamp_us"] + frames = arr["frame"] + # print(frames.shape) # (16, 480, 640, 3) + + # format array as expected by torch + # float tensor: [0,1] + # expected shape = (frames, channels, height, width) + frames = frames.transpose(0,3,1,2) / 255. + # print(frames.shape) # (16, 3, 480, 640) + + # convert np.array to torch.tensor + frame_tensor = torch.Tensor(frames) + + # preprocess frames + imgs = transform(frame_tensor).cuda() + # print(imgs.shape) # (16, 3, 96, 96) + # print(imgs.dtype) + + # predict on frames (after adding a batch dim) + output = model(imgs.unsqueeze(dim=0)) + + # model output + confidences = (torch.nn.Softmax(dim=1)(output).data).cpu().numpy()[0] + + # logger.info(f"model : {model_fps():.2f} predictions / second") + + # put confidences in output queue + try: + confidence_queue.put_nowait(confidences) + except queue.Full: + try: + confidence_queue.get() + except queue.Empty: + pass + finally: + confidence_queue.put_nowait(confidences) + + # print('Reader %r is done' % id(pointer)) + + +def display_reader(ring_buffer, n: int=1): + + cv2.namedWindow("test") + + while True: + + try: + data = ring_buffer.blocking_read(None, length=n) + except ring_buffer.WriterFinishedError: + break + + # structured array + tp = np.dtype(Frame) + arr = np.frombuffer(data, dtype=tp) + + # accessing structured array + timestamps = arr["timestamp_us"] + frames = arr["frame"] + + # print(f"Reader saw records at timestamp {timestamps[0]} to {timestamps[1]}, frame_shape={frames.shape}") + + cv2.imshow("test", cv2.flip(cv2.cvtColor(frames[0], cv2.COLOR_RGB2BGR), 1)) + cv2.waitKey(1) + + cv2.destroyAllWindows() + # print('Reader %r is done' % id(pointer)) + + +def plot_png(confidence_queue): + + # confidence > confidence_thresh ? turn bar (patch) green : turn bar (patch) blue + confidence_thresh = 0.6 + + # read jester labels + with open("src/app/jester-v1-labels.txt", "r") as fh: + gesture_labels = fh.read().splitlines() + + pos = range(len(gesture_labels)) + + # create figure object, we don't use the matplotlib GUI + # so use the base figure class + fig = Figure(figsize=(8,4)) + ax = fig.add_subplot(1, 1, 1) + bars = ax.bar(pos, np.zeros(len(gesture_labels)), align="center") + ax.set_ylim(0, 1) + ax.set_xticks(pos) + ax.set_xticklabels(gesture_labels, rotation=60, ha='right') + ax.set_xlabel("Jester gesture classes") + ax.set_ylabel("confidence") + fig.tight_layout() + + while True: + + try: + # read data from queue + confidences = confidence_queue.get_nowait() + + # update the height for each bar + for rect, y in zip(bars, confidences): + if y > confidence_thresh: + rect.set_color("g") + else: + rect.set_color("b") + rect.set_height(y) + + except queue.Empty: # no data has been returned, detection is off + continue + + finally: + # hacky, already implemented for web UI + canvas = FigureCanvas(fig) + canvas.draw() + buf = canvas.buffer_rgba() + + X = np.asarray(buf) + + cv2.imshow("test_plot", cv2.cvtColor(X, cv2.COLOR_RGB2BGR)) + cv2.waitKey(1) + + cv2.destroyAllWindows() + + +if __name__=="__main__": + logger = multiprocessing.log_to_stderr(logging.INFO) + + # define ring buffer large enough to hold N "frames" (ctypes.Structure defined above) + ring_buffer = RingBuffer(c_type=Frame, slot_count=32) + + # define queue to pass confidence data to plotting process + # small size to prevent plot from falling behind + confidence_queue = multiprocessing.Queue(maxsize=2) + + ring_buffer.new_writer() + + # reader pointers ignored, so probably safe to remove this + # ring_buffer.new_reader() + # ring_buffer.new_reader() + + processes = [ + multiprocessing.Process(target=model_reader, args=(ring_buffer, 16, confidence_queue, )), + multiprocessing.Process(target=display_reader, args=(ring_buffer, 1, )), + multiprocessing.Process(target=camera_proc, args=(ring_buffer, )), + multiprocessing.Process(target=plot_png, args=(confidence_queue, )) + ] + + for p in processes: + p.daemon = True + p.start() + + # only run for 2 minutes + time.sleep(120) + + for p in processes: + p.terminate() + \ No newline at end of file diff --git a/test_newbackend/ringbuffer.py b/test_newbackend/ringbuffer.py new file mode 100644 index 0000000..30e95ae --- /dev/null +++ b/test_newbackend/ringbuffer.py @@ -0,0 +1,387 @@ +"""Ring buffers for multiprocessing. + +Allows multiple child Python processes started via the multiprocessing module +to read from a shared ring buffer in the parent process. For each child, a +pointer is maintained for the purpose of reading. One pointer is maintained by +for the purpose of writing. Reads may be issued in blocking or non-blocking +mode. Writes are always in non-blocking mode. +""" + +import ctypes +import contextlib +import multiprocessing +from typing import Type + + +class Error(Exception): + pass + + +class DataTooLargeError(Error, ValueError): + pass + + +class WaitingForReaderError(Error): + pass + + +class WaitingForWriterError(Error): + pass + + +class WriterFinishedError(Error): + pass + + +class AlreadyClosedError(Error): + pass + + +class MustCreatedReadersBeforeWritingError(Error): + pass + + +class InternalLockingError(Error): + pass + + +class InsufficientDataError(Error): + pass + + +class Position: + def __init__(self, slot_count): + self.counter = 0 + self.slot_count = slot_count + + @property + def index(self): + return self.counter % self.slot_count + + @property + def generation(self): + return self.counter // self.slot_count + + +class Pointer: + def __init__(self, slot_count, *, start=None): + default = start if start is not None else 0 + self.counter = multiprocessing.RawValue(ctypes.c_longlong, default) + self.position = Position(slot_count) + + def increment(self): + self.counter.value += 1 + + def increment_by(self, n: int): + self.counter.value += n + + def get(self): + # Avoid reallocating Position repeatedly. + self.position.counter = self.counter.value + return self.position + + def set(self, counter: int): + self.counter.value = counter + + +class RingBuffer: + """Circular buffer class accessible to multiple threads or child processes. + + All methods are thread safe. Multiple readers and writers are permitted. + Before kicking off multiprocessing.Process instances, first allocate all + of the writers you'll need with new_writer() and readers with new_reader(). + Pass the Pointer value returned by the new_reader() method to the + multiprocessing.Process constructor along with the RingBuffer instance. + Calling new_writer() or new_reader() from a child multiprocessing.Process + will not work. + """ + + def __init__(self, *, c_type: Type[ctypes._SimpleCData], slot_count: int): + """Initializer. + + Args: + c_type: Ctypes type of the slot. + slot_count: How many slots should be in the buffer. + """ + self.slot_count = slot_count + self.array = multiprocessing.RawArray(c_type, slot_count) + self.c_type = c_type + self.slot_size = ctypes.sizeof(self.c_type) + self.lock = ReadersWriterLock() + # Each reading process may modify its own Pointer while the read + # lock is being held. Each reading process can also load the position + # of the writer, but not load any other readers. Each reading process + # can also load the value of the 'active' count. + self.readers = [] + # The writer can load and store the Pointer of all the reader Pointers + # or the writer Pointer while the write lock is held. It can also load + # and store the value of the 'active' acount. + self.writer = Pointer(self.slot_count) + self.active = multiprocessing.RawValue(ctypes.c_uint, 0) + + def new_reader(self) -> Pointer: + """Returns a new unique reader into the buffer. + + This must only be called in the parent process. It must not be + called in a child multiprocessing.Process. See class docstring. To + enforce this policy, no readers may be allocated after the first + write has occurred. + """ + with self.lock.for_write(): + writer_position = self.writer.get() + if writer_position.counter > 0: + raise MustCreatedReadersBeforeWritingError + + reader = Pointer(self.slot_count, start=writer_position.counter) + self.readers.append(reader) + return reader + + def new_writer(self) -> None: + """Must be called once by each writer before any reads occur. + + Should be paired with a single subsequent call to writer_done() to + indicate that this writer has finished and will not write any more + data into the ring. + """ + with self.lock.for_write(): + self.active.value += 1 + + def _has_write_conflict(self, position: Position) -> bool: + index = position.index + generation = position.generation + for reader in self.readers: + # This Position and the other Position both point at the same index + # in the ring buffer, but they have different generation numbers. + # This means the writer can't proceed until some readers have + # sufficiently caught up. + reader_position = reader.get() + if (reader_position.index == index + and reader_position.generation < generation): + return True + + return False + + def write(self, data: ctypes._SimpleCData) -> None: + """writes the next slot, but will not block. + + Once a successful write occurs, all pending blocking_read() calls + will be woken up to consume the newly written slot. + + Args: + data: Object to write in the next available slot. + + Raises: + WaitingForReaderError: If all of the slots are full and we need + to wait for readers to catch up before there will be + sufficient room to write more data. This is a sign that + the readers can't keep up with the writer. Consider calling + force_reader_sync() if you need to force the readers to + catch up, but beware that means they will miss data. + """ + with self.lock.for_write(): + if not self.active.value: + raise AlreadyClosedError + + position = self.writer.get() + # ignore write conflicts + # if self._has_write_conflict(position): + # raise WaitingForReaderError + + self.array[position.index] = data + self.writer.increment() + + # def try_write_multiple(self, data: ctypes.Array) -> None: + # with self.lock.for_write(): + # if not self.active.value: + # raise AlreadyClosedError + + # for item in data: + # position = self.writer.get() + # if self._has_write_conflict(position): + # raise WaitingForReaderError + + # self.array[position.index] = item + # self.writer.increment() + + def _try_read_no_lock(self, reader: Pointer, + length: int = 1) -> ctypes._SimpleCData: + # position = reader.get() + writer_position = self.writer.get() + + # We want to read the last n frames, regardless if they have been previously read + # using the writer's current position find my position + position = writer_position + position.counter = writer_position.counter - length + + # if position counter is negative then there is not enough data recorded yet + # wait for writer + if position.counter < 0: + raise WaitingForWriterError + + new_array = (self.c_type * length)() + + remaining = self.slot_count - position.index + ctypes.memmove(new_array, + ctypes.addressof(self.array[position.index]), + self.slot_size * min(length, remaining)) + if length > remaining: + ctypes.memmove( + ctypes.addressof(new_array[remaining]), + ctypes.addressof(self.array), + (length - remaining) * self.slot_size) + + # reader.increment_by(length) + return new_array + + def try_read(self, reader: Pointer, length: int = 1): + """Tries to read the next slot for a reader, but will not block. + + Args: + reader: Position previously returned by the call to new_reader(). + + Returns: + ctype array of c_type objects. + + Raises: + WriterFinishedError: If the RingBuffer was closed before this + read operation began. + WaitingForWriterError: If the given reader has already consumed + all the data in the ring buffer and would need to block in + order to wait for new data to arrive. + """ + with self.lock.for_read(): + return self._try_read_no_lock(reader, length=length) + + def try_read_all(self, reader: Pointer): + with self.lock.for_read(): + length = self.writer.get().counter - reader.get().counter + return self._try_read_no_lock(reader, length=length) + + def blocking_read(self, reader, length: int = 1): + """Reads the next slot for a reader, blocking if it isn't filled yet. + + Args: + reader: Position previously returned by the call to new_reader(). + + Returns: + bytearray containing a copy of the data from the slot. This + value is mutable an can be used to back ctypes objects, NumPy + arrays, etc. + + Raises: + WriterFinishedError: If the RingBuffer was closed while waiting + to read the next operation. + """ + with self.lock.for_read(): + while True: + try: + return self._try_read_no_lock(reader, length=length) + except WaitingForWriterError: + self.lock.wait_for_write() + + def force_reader_sync(self): + """Forces all readers to skip to the position of the writer.""" + with self.lock.for_write(): + writer_position = self.writer.get() + + for reader in self.readers: + reader.set(writer_position.counter) + + for reader in self.readers: + reader.get() + + def writer_done(self): + """Called by the writer when no more data is expected to be written. + + Should be called once for every corresponding call to new_writer(). + Once all writers have called writer_done(), a WriterFinishedError + exception will be raised by any blocking read calls or subsequent + calls to read. + """ + with self.lock.for_write(): + self.active.value -= 1 + + +class ReadersWriterLock: + """Multiprocessing-compatible Readers/Writer lock. + + The algorithm: + https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_a_condition_variable_and_a_mutex + + Background on the Kernel: + https://www.kernel.org/doc/Documentation/memory-barriers.txt + + sem_wait on Linux uses NPTL, which uses futexes: + https://github.com/torvalds/linux/blob/master/kernel/futex.c + + Notably, futexes use the smp_mb() memory fence, which is a general write + barrier, meaning we can assume that all memory reads and writes before + a barrier will complete before reads and writes after the barrier, even + if the semaphore / futex isn't actively held. + """ + + def __init__(self): + self.lock = multiprocessing.Lock() + self.readers_condition = multiprocessing.Condition(self.lock) + self.writer_condition = multiprocessing.Condition(self.lock) + self.readers = multiprocessing.RawValue(ctypes.c_uint, 0) + self.writer = multiprocessing.RawValue(ctypes.c_bool, False) + + def _acquire_reader_lock(self): + with self.lock: + while self.writer.value: + self.readers_condition.wait() + + self.readers.value += 1 + + def _release_reader_lock(self): + with self.lock: + self.readers.value -= 1 + + if self.readers.value == 0: + self.writer_condition.notify() + + @contextlib.contextmanager + def for_read(self): + """Acquire the lock for reading.""" + self._acquire_reader_lock() + try: + yield + finally: + self._release_reader_lock() + + def _acquire_writer_lock(self): + with self.lock: + while self.writer.value or self.readers.value > 0: + self.writer_condition.wait() + + self.writer.value = True + + def _release_writer_lock(self): + with self.lock: + self.writer.value = False + self.readers_condition.notify_all() + self.writer_condition.notify() + + @contextlib.contextmanager + def for_write(self): + """Acquire the lock for writing reading.""" + self._acquire_writer_lock() + try: + yield + finally: + self._release_writer_lock() + + def wait_for_write(self): + """Block until a writer has notified readers. + + Must be called while the read lock is already held. May return + spuriously before the writer actually did something. + """ + with self.lock: + if self.readers.value == 0: + raise InternalLockingError + self.readers.value -= 1 + self.writer_condition.notify() + self.readers_condition.wait() + self.readers.value += 1 diff --git a/test_newbackend/utils/README.md b/test_newbackend/utils/README.md new file mode 100644 index 0000000..8c4e9a9 --- /dev/null +++ b/test_newbackend/utils/README.md @@ -0,0 +1,10 @@ +# utilities + +**image_generator.py** +class `imgGenerator`: generator of numbered images, originally used for testing + +**opencv_cam.py** +contains functions to pretty-print opencv's VideoCapture object properties + +**fps.py** +simple class to calculate frames per second (or operations per second) \ No newline at end of file diff --git a/test_newbackend/utils/fps.py b/test_newbackend/utils/fps.py new file mode 100644 index 0000000..d6d06a8 --- /dev/null +++ b/test_newbackend/utils/fps.py @@ -0,0 +1,16 @@ +"""simple class to calc frames per second +https://stackoverflow.com/a/54539292/8615419 +""" + +import collections +import time + +class FPS: + def __init__(self, avarageof=50): + self.frametimestamps = collections.deque(maxlen=avarageof) + def __call__(self): + self.frametimestamps.append(time.time()) + if(len(self.frametimestamps) > 1): + return len(self.frametimestamps)/(self.frametimestamps[-1]-self.frametimestamps[0]) + else: + return 0.0 \ No newline at end of file diff --git a/test_newbackend/utils/image_generator.py b/test_newbackend/utils/image_generator.py new file mode 100644 index 0000000..a2ea5d0 --- /dev/null +++ b/test_newbackend/utils/image_generator.py @@ -0,0 +1,87 @@ +"""image_generator.py + +So I don't have to stare at my ugly mug all the time + +And is helpful to see if the ordering is as expected + +TODO: Add arg to return as float, currently always returns uint8 +""" + +import cv2 +import numpy as np + +class imgGenerator: + """generate sequences of numbered images""" + + def __init__(self, img_width: int=640, img_height: int=480, gray: bool=False): + + self.count = 0 + + self.img_width = img_width + self.img_height = img_height + self.gray = gray + + self.font = cv2.FONT_HERSHEY_SIMPLEX + self.thickness = 3 + self.size = self._get_optimize_font_size() + + + def _get_optimize_font_size(self): + + # arbitrarily chosen, but I want at least this number to fit on the image + n = 9999 + size = 0 + + # reversed so exit on the biggest text that would fit + for size in reversed(range(1, 20)): + + size_x, _ = cv2.getTextSize(str(n), self.font, size, self.thickness)[0] + + if (size_x > (self.img_width * .666)) and (size_x < (self.img_width * .75)): + break + + return size + + + def __next__(self): + + shape = (self.img_height, self.img_width, 1 if self.gray else 3) + img = np.ones(shape=shape) + + # center text on image + textsize = cv2.getTextSize(str(self.count), self.font, self.size, self.thickness)[0] + text_x = (self.img_width - textsize[0]) // 2 + text_y = (self.img_height + textsize[1]) // 2 + + cv2.putText( + img, + str(self.count), + (text_x, text_y), + self.font, + self.size, + (0,0,0), + thickness=self.thickness + ) + + self.count += 1 + return (img * 255).astype(np.uint8) + + +if __name__ == "__main__": + + ### example usage ### + img_gen = imgGenerator(img_width=1080, img_height=720) + + while True: + + img = next(img_gen) + + cv2.imshow("example", img) + + key = cv2.waitKey(500) + + # "q" or escape to quit + if key == ord("q") or key == 27: + break + + cv2.destroyAllWindows() \ No newline at end of file diff --git a/test_newbackend/utils/opencv_cam.py b/test_newbackend/utils/opencv_cam.py new file mode 100644 index 0000000..1a28e1d --- /dev/null +++ b/test_newbackend/utils/opencv_cam.py @@ -0,0 +1,31 @@ +import cv2 + +def decode_fourcc(cc): + """ + transform int code into human readable str + https://stackoverflow.com/a/49138893/8615419 + """ + return "".join([chr((int(cc) >> 8 * i) & 0xFF) for i in range(4)]) + +def print_cap_props(capture: cv2.VideoCapture): + + props = { + "CAP_PROP_FRAME_WIDTH": cv2.CAP_PROP_FRAME_WIDTH, + "CAP_PROP_FRAME_HEIGHT": cv2.CAP_PROP_FRAME_HEIGHT, + "CAP_PROP_FPS": cv2.CAP_PROP_FPS, + "CAP_PROP_FOURCC": cv2.CAP_PROP_FOURCC, + "CAP_PROP_BUFFERSIZE": cv2.CAP_PROP_BUFFERSIZE, + "CAP_PROP_CONVERT_RGB": cv2.CAP_PROP_CONVERT_RGB, + "CAP_PROP_AUTO_WB": cv2.CAP_PROP_AUTO_WB, + "CAP_PROP_BACKEND": cv2.CAP_PROP_BACKEND, + "CAP_PROP_CODEC_PIXEL_FORMAT": cv2.CAP_PROP_CODEC_PIXEL_FORMAT, + # "CAP_PROP_HW_ACCELERATION": cv2.CAP_PROP_HW_ACCELERATION, + # "CAP_PROP_HW_DEVICE": cv2.CAP_PROP_HW_DEVICE, + # "CAP_PROP_HW_ACCELERATION_USE_OPENCL": cv2.CAP_PROP_HW_ACCELERATION_USE_OPENCL, + } + + for key, val in props.items(): + if "FOURCC" in key: + print(f"{key:>35} ={decode_fourcc(capture.get(val)):>6}") + else: + print(f"{key:>35} ={capture.get(val):6.0f}")