-
JasonJiazhiZhang authored
* Add checkpoint progress tracking for evalute_ppo. Now when specified with a checkpoint directory, * Evaluate_ppo will evaluate checkpoints in chronological order, and constantly check for new checkpoint. * Add tensorboard visualization to both train_ppo and evaluate_ppo * Add video generation for navigation episode evaluation. Generated videos can be either saved locally or visualized through tensorboard. * Add shortest path visualization
JasonJiazhiZhang authored* Add checkpoint progress tracking for evalute_ppo. Now when specified with a checkpoint directory, * Evaluate_ppo will evaluate checkpoints in chronological order, and constantly check for new checkpoint. * Add tensorboard visualization to both train_ppo and evaluate_ppo * Add video generation for navigation episode evaluation. Generated videos can be either saved locally or visualized through tensorboard. * Add shortest path visualization
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
utils.py 6.90 KiB
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
from typing import Dict, List, Optional, Tuple
import cv2
import imageio
import numpy as np
import tqdm
from habitat.utils.visualizations import maps
def paste_overlapping_image(
background: np.ndarray,
foreground: np.ndarray,
location: Tuple[int, int],
mask: Optional[np.ndarray] = None,
):
r"""Composites the foreground onto the background dealing with edge
boundaries.
Args:
background: the background image to paste on.
foreground: the image to paste. Can be RGB or RGBA. If using alpha
blending, values for foreground and background should both be
between 0 and 255. Otherwise behavior is undefined.
location: the image coordinates to paste the foreground.
mask: If not None, a mask for deciding what part of the foreground to
use. Must be the same size as the foreground if provided.
Returns:
The modified background image. This operation is in place.
"""
assert mask is None or mask.shape[:2] == foreground.shape[:2]
foreground_size = foreground.shape[:2]
min_pad = (
max(0, foreground_size[0] // 2 - location[0]),
max(0, foreground_size[1] // 2 - location[1]),
)
max_pad = (
max(
0,
(location[0] + (foreground_size[0] - foreground_size[0] // 2))
- background.shape[0],
),
max(
0,
(location[1] + (foreground_size[1] - foreground_size[1] // 2))
- background.shape[1],
),
)
background_patch = background[
(location[0] - foreground_size[0] // 2 + min_pad[0]) : (
location[0]
+ (foreground_size[0] - foreground_size[0] // 2)
- max_pad[0]
),
(location[1] - foreground_size[1] // 2 + min_pad[1]) : (
location[1]
+ (foreground_size[1] - foreground_size[1] // 2)
- max_pad[1]
),
]
foreground = foreground[
min_pad[0] : foreground.shape[0] - max_pad[0],
min_pad[1] : foreground.shape[1] - max_pad[1],
]
if foreground.size == 0 or background_patch.size == 0:
# Nothing to do, no overlap.
return background
if mask is not None:
mask = mask[
min_pad[0] : foreground.shape[0] - max_pad[0],
min_pad[1] : foreground.shape[1] - max_pad[1],
]
if foreground.shape[2] == 4:
# Alpha blending
foreground = (
background_patch.astype(np.int32) * (255 - foreground[:, :, [3]])
+ foreground[:, :, :3].astype(np.int32) * foreground[:, :, [3]]
) // 255
if mask is not None:
background_patch[mask] = foreground[mask]
else:
background_patch[:] = foreground
return background
def images_to_video(
images: List[np.ndarray],
output_dir: str,
video_name: str,
fps: int = 10,
quality: Optional[float] = 5,
**kwargs
):
r"""Calls imageio to run FFMPEG on a list of images. For more info on
parameters, see https://imageio.readthedocs.io/en/stable/format_ffmpeg.html
Args:
images: The list of images. Images should be HxWx3 in RGB order.
output_dir: The folder to put the video in.
video_name: The name for the video.
fps: Frames per second for the video. Not all values work with FFMPEG,
use at your own risk.
quality: Default is 5. Uses variable bit rate. Highest quality is 10,
lowest is 0. Set to None to prevent variable bitrate flags to
FFMPEG so you can manually specify them using output_params
instead. Specifying a fixed bitrate using ‘bitrate’ disables
this parameter.
"""
assert 0 <= quality <= 10
if not os.path.exists(output_dir):
os.makedirs(output_dir)
video_name = video_name.replace(" ", "_").replace("\n", "_") + ".mp4"
writer = imageio.get_writer(
os.path.join(output_dir, video_name),
fps=fps,
quality=quality,
**kwargs
)
for im in tqdm.tqdm(images):
writer.append_data(im)
writer.close()
def draw_collision(view: np.ndarray, alpha: float = 0.4) -> np.ndarray:
r"""Draw translucent red strips on the border of input view to indicate
a collision has taken place.
Args:
view: input view of size HxWx3 in RGB order.
alpha: Opacity of red collision strip. 1 is completely non-transparent.
Returns:
A view with collision effect drawn.
"""
size = view.shape[0]
strip_width = size // 20
mask = np.ones((size, size))
mask[strip_width:-strip_width, strip_width:-strip_width] = 0
mask = mask == 1
view[mask] = (alpha * np.array([255, 0, 0]) + (1.0 - alpha) * view)[mask]
return view
def observations_to_image(observation: Dict, info: Dict) -> np.ndarray:
r"""Generate image of single frame from observation and info
returned from a single environment step().
Args:
observation: observation returned from an environment step().
info: info returned from an environment step().
Returns:
generated image of a single frame.
"""
observation_size = observation["rgb"].shape[0]
egocentric_view = observation["rgb"][:, :, :3]
# draw collision
if "collisions" in info and info["collisions"]["is_collision"]:
egocentric_view = draw_collision(egocentric_view)
# draw depth map if observation has depth info
if "depth" in observation:
depth_map = (observation["depth"].squeeze() * 255).astype(np.uint8)
depth_map = np.stack([depth_map for _ in range(3)], axis=2)
egocentric_view = np.concatenate((egocentric_view, depth_map), axis=1)
frame = egocentric_view
if "top_down_map" in info:
top_down_map = info["top_down_map"]["map"]
top_down_map = maps.colorize_topdown_map(top_down_map)
map_agent_pos = info["top_down_map"]["agent_map_coord"]
top_down_map = maps.draw_agent(
image=top_down_map,
agent_center_coord=map_agent_pos,
agent_rotation=info["top_down_map"]["agent_angle"],
agent_radius_px=top_down_map.shape[0] // 16,
)
if top_down_map.shape[0] > top_down_map.shape[1]:
top_down_map = np.rot90(top_down_map, 1)
# scale top down map to align with rgb view
old_h, old_w, _ = top_down_map.shape
top_down_height = observation_size
top_down_width = int(float(top_down_height) / old_h * old_w)
# cv2 resize (dsize is width first)
top_down_map = cv2.resize(
top_down_map,
(top_down_width, top_down_height),
interpolation=cv2.INTER_CUBIC,
)
frame = np.concatenate((egocentric_view, top_down_map), axis=1)
return frame