Module dynamo.stream
Capture a stream of 3D scans from all connected Intel RealSense D4XX cameras
Distributed as a module of DynaMo: https://github.com/anderson-cu-bioastronautics/dynamo_realsense-capture
Source code
##########################################################################################################################################
## License: Apache 2.0. See LICENSE file in root directory. ##
##########################################################################################################################################
"""
Capture a stream of 3D scans from all connected Intel RealSense D4XX cameras
Distributed as a module of DynaMo: https://github.com/anderson-cu-bioastronautics/dynamo_realsense-capture
"""
import pyrealsense2 as rs
import pickle
import queue
import threading
import time
import os
import copy
import numpy as np
from .view import depthFrametoPC
def captureThread(q,deviceManager,stream_time):
"""
Function to capture frames from connected RealSense Cameras to memory
Parameters
----------
q : Queue object
Queue for storing captured frames
deviceManager : DeviceManager object
realsense_device_manager object which manages connections to all cameras
stream_time : float
Time in seconds for how long to collect data
"""
i=0
refSerial = list(deviceManager._enabled_devices.keys())[0]
fnumber = deviceManager.poll_frames()[refSerial].get_frame_number() #get frame number to ensure subsequent frames are saved
while i<int(90*stream_time):
frames = deviceManager.poll_frames()
newfnumber = frames[refSerial].get_frame_number()
if fnumber != newfnumber: #only save if frame has not already been saved
q.put(frames)
i+=1
print(str(newfnumber)+' '+str(i)+'/'+str(int(90*stream_time)))
fnumber = newfnumber
def processThread(q,devicesTransformation, saveDirectory):
"""
Function to process frames from queue to save to disk in pickle format
Parameters
----------
q : Queue object
Queue which stores captured frames
devicesTransformation : dict
dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera
saveDirectory : str
Folder name in which to save frames, referenced to current working directory
"""
i=0
while not q.empty(): #while frames are still waiting to be processed
framesAll = q.get()
fname = saveDirectory+'\\'+str(format(i, '05d'))+'.pickle'
file = open(fname,'wb')
savedData={}
for device,frames in framesAll.items():
deviceData = {}
align = rs.align(rs.stream.depth) #setup rs.align object
alignedFrames = align.process(frames) #align frames
depthFrame = alignedFrames.get_depth_frame()
rsIntrinsics = depthFrame.get_profile().as_video_stream_profile().get_intrinsics()
deviceData['depth'] = copy.deepcopy(np.asanyarray(depthFrame.get_data())) #save depth frame
try:
infraredFrame = alignedFrames.get_infrared_frame(1)
deviceData['infrared'] = copy.deepcopy(np.asanyarray(infraredFrame.get_data())) #save infrared frame
except:
pass
try:
colorFrame = alignedFrames.get_color_frame()
deviceData['color'] = copy.deepcopy(np.asanyarray(colorFrame.get_data())) #save infrared frame
except:
pass
deviceData['intrinsics'] = {'ppx': rsIntrinsics.ppx, 'ppy':rsIntrinsics.ppy, 'fx':rsIntrinsics.fx, 'fy':rsIntrinsics.fy} #save relevant intrinsics
deviceData['poseMat'] = devicesTransformation[device][0] #save transformation matrix
savedData[device]=deviceData #save each camera's information in master dictionary for frame
pickle.dump(copy.deepcopy(savedData),file)
print('processed'+str(i))
i+=1
file.close()
q.task_done()
print('queue finished')
def single(deviceManager, devicesTransformation):
"""
Function to return single, aligned pointcloud from conneted Realsense Cameras
Parameters
----------
deviceManager : DeviceManager object
realsense_device_manager object which manages connections to all cameras
devicesTransformation : dict
dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera
Returns
-------
allPoints : (n,6) array
array containing list of all points from all connected cameras
"""
frames = deviceManager.poll_frames()
allPoints = np.empty((0,6))
for camera,frame in frames.items():
deviceData = {}
align = rs.align(rs.stream.depth) #setup rs.align object
alignedFrames = align.process(frame) #align frames
depthFrame = alignedFrames.get_depth_frame()
rsIntrinsics = depthFrame.get_profile().as_video_stream_profile().get_intrinsics()
deviceData['depth'] = copy.deepcopy(np.asanyarray(depthFrame.get_data())) #save depth frame
try:
infraredFrame = alignedFrames.get_infrared_frame(1)
deviceData['infrared'] = copy.deepcopy(np.asanyarray(infraredFrame.get_data())) #save infrared frame
except:
pass
try:
colorFrame = alignedFrames.get_color_frame()
deviceData['color'] = copy.deepcopy(np.asanyarray(colorFrame.get_data())) #save infrared frame
except:
pass
deviceData['intrinsics'] = {'ppx': rsIntrinsics.ppx, 'ppy':rsIntrinsics.ppy, 'fx':rsIntrinsics.fx, 'fy':rsIntrinsics.fy} #save relevant intrinsics
deviceData['poseMat'] = devicesTransformation[camera][0] #save transformation matrix
points = depthFrametoPC(deviceData, format='rgb')
allPoints = np.append(allPoints, points, axis=0)
return allPoints
def start(deviceManager, deviceTransformations, saveDirectory, stream_time):
"""
Function to save aligned frames from connected RealSense Cameras to disk for future processing
Parameters
----------
deviceManager : DeviceManager object
realsense_device_manager object which manages connections to all cameras
devicesTransformation : dict
dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera
saveDirectory : str
Folder name in which to save frames, referenced to current working directory
time : float
Time in seconds for how long to collect data
"""
q = queue.Queue(maxsize=0) #queue object to hold incoming frames
capture = threading.Thread(target=captureThread,args=(q,deviceManager,int(stream_time))) #thread which will capture frames
process = threading.Thread(target=processThread,args=(q,deviceTransformations,saveDirectory)) #thread which will save frames to disk
capture.start()
capture.join()
time.sleep(int(stream_time)+1) #delay to allow capture thread to finish, this may or may not be needed based on hardware specifications but is recommended to reduce dropped frames
process.start()
process.join()
q.join()
Functions
def captureThread(q, deviceManager, stream_time)-
Function to capture frames from connected RealSense Cameras to memory
Parameters
q:Queueobject- Queue for storing captured frames
deviceManager:DeviceManagerobject- realsense_device_manager object which manages connections to all cameras
stream_time:float- Time in seconds for how long to collect data
Source code
def captureThread(q,deviceManager,stream_time): """ Function to capture frames from connected RealSense Cameras to memory Parameters ---------- q : Queue object Queue for storing captured frames deviceManager : DeviceManager object realsense_device_manager object which manages connections to all cameras stream_time : float Time in seconds for how long to collect data """ i=0 refSerial = list(deviceManager._enabled_devices.keys())[0] fnumber = deviceManager.poll_frames()[refSerial].get_frame_number() #get frame number to ensure subsequent frames are saved while i<int(90*stream_time): frames = deviceManager.poll_frames() newfnumber = frames[refSerial].get_frame_number() if fnumber != newfnumber: #only save if frame has not already been saved q.put(frames) i+=1 print(str(newfnumber)+' '+str(i)+'/'+str(int(90*stream_time))) fnumber = newfnumber def processThread(q, devicesTransformation, saveDirectory)-
Function to process frames from queue to save to disk in pickle format
Parameters
q:Queueobject- Queue which stores captured frames
devicesTransformation:dict- dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera
saveDirectory:str- Folder name in which to save frames, referenced to current working directory
Source code
def processThread(q,devicesTransformation, saveDirectory): """ Function to process frames from queue to save to disk in pickle format Parameters ---------- q : Queue object Queue which stores captured frames devicesTransformation : dict dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera saveDirectory : str Folder name in which to save frames, referenced to current working directory """ i=0 while not q.empty(): #while frames are still waiting to be processed framesAll = q.get() fname = saveDirectory+'\\'+str(format(i, '05d'))+'.pickle' file = open(fname,'wb') savedData={} for device,frames in framesAll.items(): deviceData = {} align = rs.align(rs.stream.depth) #setup rs.align object alignedFrames = align.process(frames) #align frames depthFrame = alignedFrames.get_depth_frame() rsIntrinsics = depthFrame.get_profile().as_video_stream_profile().get_intrinsics() deviceData['depth'] = copy.deepcopy(np.asanyarray(depthFrame.get_data())) #save depth frame try: infraredFrame = alignedFrames.get_infrared_frame(1) deviceData['infrared'] = copy.deepcopy(np.asanyarray(infraredFrame.get_data())) #save infrared frame except: pass try: colorFrame = alignedFrames.get_color_frame() deviceData['color'] = copy.deepcopy(np.asanyarray(colorFrame.get_data())) #save infrared frame except: pass deviceData['intrinsics'] = {'ppx': rsIntrinsics.ppx, 'ppy':rsIntrinsics.ppy, 'fx':rsIntrinsics.fx, 'fy':rsIntrinsics.fy} #save relevant intrinsics deviceData['poseMat'] = devicesTransformation[device][0] #save transformation matrix savedData[device]=deviceData #save each camera's information in master dictionary for frame pickle.dump(copy.deepcopy(savedData),file) print('processed'+str(i)) i+=1 file.close() q.task_done() print('queue finished') def single(deviceManager, devicesTransformation)-
Function to return single, aligned pointcloud from conneted Realsense Cameras
Parameters
deviceManager:DeviceManagerobject- realsense_device_manager object which manages connections to all cameras
devicesTransformation:dict- dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera
Returns
allPoints: (n,6)array- array containing list of all points from all connected cameras
Source code
def single(deviceManager, devicesTransformation): """ Function to return single, aligned pointcloud from conneted Realsense Cameras Parameters ---------- deviceManager : DeviceManager object realsense_device_manager object which manages connections to all cameras devicesTransformation : dict dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera Returns ------- allPoints : (n,6) array array containing list of all points from all connected cameras """ frames = deviceManager.poll_frames() allPoints = np.empty((0,6)) for camera,frame in frames.items(): deviceData = {} align = rs.align(rs.stream.depth) #setup rs.align object alignedFrames = align.process(frame) #align frames depthFrame = alignedFrames.get_depth_frame() rsIntrinsics = depthFrame.get_profile().as_video_stream_profile().get_intrinsics() deviceData['depth'] = copy.deepcopy(np.asanyarray(depthFrame.get_data())) #save depth frame try: infraredFrame = alignedFrames.get_infrared_frame(1) deviceData['infrared'] = copy.deepcopy(np.asanyarray(infraredFrame.get_data())) #save infrared frame except: pass try: colorFrame = alignedFrames.get_color_frame() deviceData['color'] = copy.deepcopy(np.asanyarray(colorFrame.get_data())) #save infrared frame except: pass deviceData['intrinsics'] = {'ppx': rsIntrinsics.ppx, 'ppy':rsIntrinsics.ppy, 'fx':rsIntrinsics.fx, 'fy':rsIntrinsics.fy} #save relevant intrinsics deviceData['poseMat'] = devicesTransformation[camera][0] #save transformation matrix points = depthFrametoPC(deviceData, format='rgb') allPoints = np.append(allPoints, points, axis=0) return allPoints def start(deviceManager, deviceTransformations, saveDirectory, stream_time)-
Function to save aligned frames from connected RealSense Cameras to disk for future processing
Parameters
deviceManager:DeviceManagerobject- realsense_device_manager object which manages connections to all cameras
devicesTransformation:dict- dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera
saveDirectory:str- Folder name in which to save frames, referenced to current working directory
time:float- Time in seconds for how long to collect data
Source code
def start(deviceManager, deviceTransformations, saveDirectory, stream_time): """ Function to save aligned frames from connected RealSense Cameras to disk for future processing Parameters ---------- deviceManager : DeviceManager object realsense_device_manager object which manages connections to all cameras devicesTransformation : dict dictionary with keys of camera's serial number holding dictionary of calibration parameters per camera saveDirectory : str Folder name in which to save frames, referenced to current working directory time : float Time in seconds for how long to collect data """ q = queue.Queue(maxsize=0) #queue object to hold incoming frames capture = threading.Thread(target=captureThread,args=(q,deviceManager,int(stream_time))) #thread which will capture frames process = threading.Thread(target=processThread,args=(q,deviceTransformations,saveDirectory)) #thread which will save frames to disk capture.start() capture.join() time.sleep(int(stream_time)+1) #delay to allow capture thread to finish, this may or may not be needed based on hardware specifications but is recommended to reduce dropped frames process.start() process.join() q.join()