DynamicGeneration / video_utils.py
Anonymous
init
5a4cd7b
raw
history blame
16.6 kB
import json
import os
import cv2
import numpy as np
import math
import heapq
import datetime
from moviepy.editor import ImageSequenceClip
import tqdm
from utils import get_scene_dir_path, OBJECT_PICTURE_DIR
class Video_Generator():
def __init__(self):
self.frame_radius = 20
self.frame_thickness = 1
self.frame_outline_color = (0,0,0) #black
self.frame_filling_color = (255, 255, 255) #white
self.traj_count = {}
def get_img_coord(self, points, camera_pose, image_width, image_height):
transformation_matrix = compute_transformation_matrix(camera_pose)
camera_positions = world_to_camera(points, transformation_matrix)
projects_points = project_to_2d(camera_positions, camera_pose, image_width, image_height)
return projects_points
def draw_objframe(self, obj_type, point, background):
image = background
obj_path = os.path.join(OBJECT_PICTURE_DIR, '{}.png'.format(obj_type))
if not os.path.exists(obj_path):
obj_path = os.path.join(OBJECT_PICTURE_DIR, 'Phone.png')
obj_img = cv2.imread(obj_path)
#draw frame
center = (int(point[0]-1.2*self.frame_radius), int(point[1]-1.2*self.frame_radius))
cv2.circle(image, center, self.frame_radius, self.frame_filling_color, -1)
cv2.circle(image, center, self.frame_radius, self.frame_outline_color, self.frame_thickness)
theta = np.pi/8
line_start1 = (int(center[0]+self.frame_radius*np.sin(theta)), int(center[1]+self.frame_radius*np.cos(theta)))
line_start2 = (int(center[0]+self.frame_radius*np.cos(theta)), int(center[1]+self.frame_radius*np.sin(theta)))
line_end = (int(center[0] + 1.2*self.frame_radius), int(center[1] + 1.2*self.frame_radius))
cv2.line(image, line_start1, line_end, self.frame_outline_color, self.frame_thickness)
cv2.line(image, line_start2, line_end, self.frame_outline_color, self.frame_thickness)
cv2.circle(image, line_end, 3, (0,0,255), -1)
#put object
obj_resized = cv2.resize(obj_img, (self.frame_radius, self.frame_radius))
x_start = max(0, obj_resized.shape[0]//2-center[1])
x_end = min(obj_resized.shape[0], obj_resized.shape[0]//2 + image.shape[0] - center[1])
y_start = max(0, obj_resized.shape[1]//2-center[0])
y_end = min(obj_resized.shape[1], obj_resized.shape[1]//2 + image.shape[1] - center[0])
img_x_start = max(0, center[1]-obj_resized.shape[0]//2)
img_x_end = min(image.shape[0], center[1]+obj_resized.shape[0]//2)
img_y_start = max(0, center[0]-obj_resized.shape[1]//2)
img_y_end = min(image.shape[1], center[0]+obj_resized.shape[1]//2)
image[img_x_start:img_x_end, img_y_start: img_y_end] = obj_resized[x_start:x_end, y_start:y_end]
def add_description(self, activity_name, time, obj_list, receptacle_list, background):
image = background.copy()
descrpition_width = 300
description_bg = np.zeros((background.shape[0], descrpition_width,3), np.uint8)
res = np.hstack((image, description_bg))
font = cv2.FONT_HERSHEY_COMPLEX
font_scale = 0.5
font_color = (0,0,0)
thickness = 1
line_type = 8
# text_size = cv2.getTextSize(text,font, font_scale, line_type)[0]
text_x = background.shape[0] + 10
text_y = descrpition_width//2 -50
text_y = 50
line_interval = 30
cv2.rectangle(res, (background.shape[1],0), (background.shape[1]+descrpition_width, background.shape[0]), (255,255,255),-1)
texts = ['activity:', activity_name, 'time: ', str(time), 'object movement: ']
for i, text in enumerate(texts):
if i%2==0:
text_x = background.shape[0] + 10
font_color = (0,0,0)
else:
text_x = background.shape[0] + 30
font_color = (0,0,255)
cv2.putText(res, text, (text_x, text_y + i*line_interval), font, font_scale, font_color,thickness, line_type)
start_line = 5
for i in range(len(obj_list)):
obj = obj_list[i].split('|')[0]
recep = receptacle_list[i].split('|')[0]
# obj_move_text = '{} -> {}'.format(obj, recep)
text_x = background.shape[0] + 120
font_color = (0,0,255)
obj_text_size = cv2.getTextSize(obj, font, font_scale, thickness)[0][0]
cv2.putText(res, obj, (text_x - 20 - obj_text_size, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type)
cv2.putText(res, '->', (text_x, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type)
cv2.putText(res, recep, (text_x + 40, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type)
return res
def draw_traj(self, info, image):
last_point = info['last_point']
point = info['point']
is_end = info['end']
is_arrow = info['arrow']
radius = 3
next_point = (int(point[0]), int(point[1]))
if last_point is None:
start_color = (0,0,255)
end_color = (255, 255, 0)
cv2.circle(image, next_point, radius, start_color, -1)
return
pre_point = (int(last_point[0]), int(last_point[1]))
line_color = (0,0,0)
line_thickness = 1
arrowcolor = (0,255,0)
arrow_thickness = 1
#count
count = self.traj_count.get((pre_point, next_point),0)
self.traj_count[(pre_point, next_point)] = count + 1
count = self.traj_count.get((next_point, pre_point),0)
self.traj_count[(next_point, pre_point)] = count + 1
step = 0.2
line_thickness = min(int(1 + count * step), 5)
#draw
cv2.line(image, pre_point, next_point, line_color, line_thickness)
if is_arrow:
cv2.arrowedLine(image, pre_point, next_point,arrowcolor,arrow_thickness,tipLength=1.5)
if is_end:
end_color = (255, 255, 0)
cv2.circle(image, next_point, radius, end_color, -1)
def get_multiobj_image(self, draw_infos, background):
image_list = []
if len(draw_infos)<=0:
return image_list, background
activity_name = draw_infos[0]['activity']
time = draw_infos[0]['time']
object_list = [info['object'] for info in draw_infos]
receptacle_list = [info['receptacle'] for info in draw_infos]
image_infos = []
for draw_info in draw_infos:
obj = draw_info['object'].split('|')[0]
points = draw_info['points']
last_point = None
for point_num, point in enumerate(points):
if point_num >= len(image_infos):
image_infos.append([])
image_infos[point_num].append({
'object':obj,
'point':point,
'last_point':last_point,
'end':point_num == len(points)-1,
'arrow':point_num == len(points)//3
})
last_point = (point[0], point[1])
image_with_traj = background.copy()
for image_info in image_infos:
#draw traj
for info in image_info:
self.draw_traj(info, image_with_traj)
#draw obj with frame
image = image_with_traj.copy()
for info in image_info:
self.draw_objframe(info['object'], info['point'], image)
image = self.add_description(activity_name, time, object_list, receptacle_list, image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_list.append(image)
return image_list, image_with_traj
def get_receptacle_position_from_meta(self, receptacle_id, metadata):
position = None
objects = metadata['objects']
for obj in objects:
if obj['objectId'] == receptacle_id:
position = obj['position']
break
return position
def get_distance(self, point1, point2):
return math.sqrt((point1['x'] - point2['x'])**2 + (point1['z'] - point2['z'])**2)
def get_nearest_point(self, point, reachable_points):
min_distance = 100000
nearest_point = None
for rp in reachable_points:
distance = self.get_distance(point, rp)
if distance < min_distance:
min_distance = distance
nearest_point = rp
return nearest_point
def get_path(self, start_position, end_position, reachable_positions):
res = []
res.append(start_position)
start_point = self.get_nearest_point(start_position, reachable_positions)
target_point = self.get_nearest_point(end_position, reachable_positions)
point_id = 0
open_list = [(0, point_id, start_point)]
came_from = {tuple((start_point['x'], start_point['z'])):0}
cost_so_far = {tuple((start_point['x'], start_point['z'])):0}
while open_list:
current = heapq.heappop(open_list)[-1]
if current == target_point:
break
for next_point in reachable_positions:
dis = self.get_distance(current, next_point)
if dis - 0.25 > 0.001:
continue
new_cost = cost_so_far[tuple((current['x'], current['z']))] + 1
if tuple((next_point['x'], next_point['z'])) not in cost_so_far or new_cost < cost_so_far[tuple((next_point['x'], next_point['z']))]:
cost_so_far[tuple((next_point['x'], next_point['z']))] = new_cost
priority = new_cost + abs(next_point['x'] - current['x']) + abs(next_point['z'] - current['z'])
point_id += 1
heapq.heappush(open_list, (priority, point_id, next_point))
came_from[tuple((next_point['x'], next_point['z']))] = current
path = []
current = target_point
while current != start_point:
path.append(current)
current = came_from[tuple((current['x'], current['z']))]
path.append(start_point)
path.reverse()
res.extend(path)
res.append(end_position)
return res
def init_obj_traj(self, metadata):
res = {}
for obj in metadata['objects']:
if obj['pickupable']:
parentReceptacle = obj['parentReceptacles']
if parentReceptacle is not None and 'Floor' in parentReceptacle:
parentReceptacle.remove('Floor')
res[obj['objectId']] = parentReceptacle[0] if (parentReceptacle is not None and len(parentReceptacle)) >0 else None
return res
def save_vedio(self, dynamic_info, background, camera_pose, metadata, reachable_positions, output_path):
object_traj = self.init_obj_traj(metadata)
self.traj_count = {}
image_list = []
paths_list = []
for time,timeinfo in tqdm.tqdm(dynamic_info.items()):
draw_infos = []
for info in timeinfo:
info['time'] = time
target_object_id = info['object']
target_receptacle_id = info['receptacle']
target_object_receptacle = object_traj[target_object_id]
if target_object_receptacle == target_receptacle_id:
continue
if target_object_receptacle is None:
for obj in metadata['objects']:
if obj['objectId'] == target_object_id:
start_position = obj['position']
break
else:
start_position = self.get_receptacle_position_from_meta(target_object_receptacle, metadata)
end_position = self.get_receptacle_position_from_meta(target_receptacle_id, metadata)
path = self.get_path(start_position, end_position, reachable_positions) #path 包括start, end
image_width = 300
image_height = 300
img_path = self.get_img_coord(path, camera_pose, image_width, image_height)
paths_list.append(img_path)
draw_info = {
'time':time,
'activity':info['activity'],
'object':target_object_id,
'receptacle':target_receptacle_id,
'points':img_path,
}
draw_infos.append(draw_info)
object_traj[target_object_id] = target_receptacle_id
time_images,image_with_traj = self.get_multiobj_image(draw_infos, background)
background = image_with_traj
image_list.extend(time_images)
clip = ImageSequenceClip(image_list, fps=30)
clip.write_videofile(str(output_path), fps=30, codec="libx264")
def get_dynamic_info(self, schedules):
res = {}
for day, day_schedules in schedules.items():
for activity in day_schedules:
activity_name = activity['activity']
start_time = activity['start_time']
content = activity['content']
for c in content:
c['activity'] = activity_name
time = datetime.datetime.combine(day, start_time)
if time not in res:
res[time] = []
res[time].extend(content)
return res
def generate(self, schedules, scene_file_name, vedio_path):
metadata, camera_pose, background, reachable_points = read_scene(scene_file_name)
schekeys = list(schedules.keys())
schedules_filter = {}
schedules_filter[schekeys[0]] = schedules[schekeys[0]]
dynamic_info = self.get_dynamic_info(schedules_filter)
self.save_vedio(dynamic_info, background, camera_pose, metadata, reachable_points, vedio_path)
def read_scene(scene_file_name):
data_dir = get_scene_dir_path(scene_file_name)
metadata = json.load(open(os.path.join(data_dir, 'metadata.json'),'r',encoding='utf-8'))
camera_pose = json.load(open(os.path.join(data_dir, 'camera_pose.json'),'r',encoding='utf-8'))
background = cv2.imread(os.path.join(data_dir, 'background.png'))
reachable_points = json.load(open(os.path.join(data_dir, 'reachablePositions.json'),'r',encoding='utf-8'))
return metadata, camera_pose, background, reachable_points
def compute_transformation_matrix(camera_pose):
position = camera_pose['position']
rotation = camera_pose['rotation']
translation_matrix = np.array([
[1, 0, 0, -position['x']],
[0, 1, 0, -position['y']],
[0, 0, 1, -position['z']],
[0, 0, 0, 1]
])
theta_x = np.radians(rotation['x'])
rotation_matrix_x = np.array([
[1, 0, 0, 0],
[0, np.cos(theta_x), -np.sin(theta_x), 0],
[0, np.sin(theta_x), np.cos(theta_x), 0],
[0, 0, 0, 1]
])
transformation_matrix = np.dot(rotation_matrix_x, translation_matrix)
return transformation_matrix
def world_to_camera(positions, transformation_matrix):
camera_positions = []
for pos in positions:
world_pos = np.array([pos['x'], pos['y'], pos['z'], 1])
camera_pos = np.dot(transformation_matrix, world_pos)
camera_positions.append(camera_pos)
return camera_positions
def project_to_2d(camera_positions, camera_pose, image_width, image_height):
fov = camera_pose['fieldOfView']
aspect_ratio = image_width / image_height
f = 1 / np.tan(np.radians(fov) / 2)
projection_matrix = np.array([
[f / aspect_ratio, 0, 0, 0],
[0, f, 0, 0],
[0, 0, 1, 0]
])
projected_points = []
for pos in camera_positions:
projected = np.dot(projection_matrix, pos)
projected /= projected[2]
x = (projected[0] + 1) * image_width / 2
y = (1 - projected[1]) * image_height / 2
x = image_width - x
projected_points.append((x, y))
return projected_points