#!/usr/bin/env python3 # Copyright (c) Facebook, Inc. and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import os from typing import Dict, List, Optional, Tuple import cv2 import imageio import numpy as np import tqdm from habitat.utils.visualizations import maps def paste_overlapping_image( background: np.ndarray, foreground: np.ndarray, location: Tuple[int, int], mask: Optional[np.ndarray] = None, ): r"""Composites the foreground onto the background dealing with edge boundaries. Args: background: the background image to paste on. foreground: the image to paste. Can be RGB or RGBA. If using alpha blending, values for foreground and background should both be between 0 and 255. Otherwise behavior is undefined. location: the image coordinates to paste the foreground. mask: If not None, a mask for deciding what part of the foreground to use. Must be the same size as the foreground if provided. Returns: The modified background image. This operation is in place. """ assert mask is None or mask.shape[:2] == foreground.shape[:2] foreground_size = foreground.shape[:2] min_pad = ( max(0, foreground_size[0] // 2 - location[0]), max(0, foreground_size[1] // 2 - location[1]), ) max_pad = ( max( 0, (location[0] + (foreground_size[0] - foreground_size[0] // 2)) - background.shape[0], ), max( 0, (location[1] + (foreground_size[1] - foreground_size[1] // 2)) - background.shape[1], ), ) background_patch = background[ (location[0] - foreground_size[0] // 2 + min_pad[0]) : ( location[0] + (foreground_size[0] - foreground_size[0] // 2) - max_pad[0] ), (location[1] - foreground_size[1] // 2 + min_pad[1]) : ( location[1] + (foreground_size[1] - foreground_size[1] // 2) - max_pad[1] ), ] foreground = foreground[ min_pad[0] : foreground.shape[0] - max_pad[0], min_pad[1] : foreground.shape[1] - max_pad[1], ] if foreground.size == 0 or background_patch.size == 0: # Nothing to do, no overlap. return background if mask is not None: mask = mask[ min_pad[0] : foreground.shape[0] - max_pad[0], min_pad[1] : foreground.shape[1] - max_pad[1], ] if foreground.shape[2] == 4: # Alpha blending foreground = ( background_patch.astype(np.int32) * (255 - foreground[:, :, [3]]) + foreground[:, :, :3].astype(np.int32) * foreground[:, :, [3]] ) // 255 if mask is not None: background_patch[mask] = foreground[mask] else: background_patch[:] = foreground return background def images_to_video( images: List[np.ndarray], output_dir: str, video_name: str, fps: int = 10, quality: Optional[float] = 5, **kwargs ): r"""Calls imageio to run FFMPEG on a list of images. For more info on parameters, see https://imageio.readthedocs.io/en/stable/format_ffmpeg.html Args: images: The list of images. Images should be HxWx3 in RGB order. output_dir: The folder to put the video in. video_name: The name for the video. fps: Frames per second for the video. Not all values work with FFMPEG, use at your own risk. quality: Default is 5. Uses variable bit rate. Highest quality is 10, lowest is 0. Set to None to prevent variable bitrate flags to FFMPEG so you can manually specify them using output_params instead. Specifying a fixed bitrate using ‘bitrate’ disables this parameter. """ assert 0 <= quality <= 10 if not os.path.exists(output_dir): os.makedirs(output_dir) video_name = video_name.replace(" ", "_").replace("\n", "_") + ".mp4" writer = imageio.get_writer( os.path.join(output_dir, video_name), fps=fps, quality=quality, **kwargs ) for im in tqdm.tqdm(images): writer.append_data(im) writer.close() def draw_collision(view: np.ndarray, alpha: float = 0.4) -> np.ndarray: r"""Draw translucent red strips on the border of input view to indicate a collision has taken place. Args: view: input view of size HxWx3 in RGB order. alpha: Opacity of red collision strip. 1 is completely non-transparent. Returns: A view with collision effect drawn. """ size = view.shape[0] strip_width = size // 20 mask = np.ones((size, size)) mask[strip_width:-strip_width, strip_width:-strip_width] = 0 mask = mask == 1 view[mask] = (alpha * np.array([255, 0, 0]) + (1.0 - alpha) * view)[mask] return view def observations_to_image(observation: Dict, info: Dict) -> np.ndarray: r"""Generate image of single frame from observation and info returned from a single environment step(). Args: observation: observation returned from an environment step(). info: info returned from an environment step(). Returns: generated image of a single frame. """ observation_size = observation["rgb"].shape[0] egocentric_view = observation["rgb"][:, :, :3] # draw collision if "collisions" in info and info["collisions"]["is_collision"]: egocentric_view = draw_collision(egocentric_view) # draw depth map if observation has depth info if "depth" in observation: depth_map = (observation["depth"].squeeze() * 255).astype(np.uint8) depth_map = np.stack([depth_map for _ in range(3)], axis=2) egocentric_view = np.concatenate((egocentric_view, depth_map), axis=1) frame = egocentric_view if "top_down_map" in info: top_down_map = info["top_down_map"]["map"] top_down_map = maps.colorize_topdown_map(top_down_map) map_agent_pos = info["top_down_map"]["agent_map_coord"] top_down_map = maps.draw_agent( image=top_down_map, agent_center_coord=map_agent_pos, agent_rotation=info["top_down_map"]["agent_angle"], agent_radius_px=top_down_map.shape[0] // 16, ) if top_down_map.shape[0] > top_down_map.shape[1]: top_down_map = np.rot90(top_down_map, 1) # scale top down map to align with rgb view old_h, old_w, _ = top_down_map.shape top_down_height = observation_size top_down_width = int(float(top_down_height) / old_h * old_w) # cv2 resize (dsize is width first) top_down_map = cv2.resize( top_down_map, (top_down_width, top_down_height), interpolation=cv2.INTER_CUBIC, ) frame = np.concatenate((egocentric_view, top_down_map), axis=1) return frame