Source code for torchpack.callbacks.trackers

import multiprocessing as mp
import os
import time
from queue import Empty
from typing import List, Optional

import numpy as np
import torch.cuda
from tensorpack.utils.concurrency import (ensure_proc_terminate,
                                          start_proc_mask_signal)
from tensorpack.utils.nvml import NVMLContext

from torchpack.callbacks.callback import Callback
from torchpack.utils.logging import logger

__all__ = ['GPUUtilizationTracker', 'ThroughputTracker']


[docs]class GPUUtilizationTracker(Callback): """ Track the average GPU utilization within an epoch. It will start a process to track GPU utilization through NVML every second within the epoch (the time of `trigger_epoch` is not included). This callback creates a process, therefore it is not safe to be used with MPI. """ master_only: bool = True def __init__(self, *, devices: Optional[List[int]] = None) -> None: if devices is not None: self.devices = devices else: env = os.environ.get('CUDA_VISIBLE_DEVICES') if env: self.devices = list(map(int, env.split(','))) elif env is None: self.devices = list(range(torch.cuda.device_count())) if len(self.devices) > 1: logger.warning( 'Neither `devices` nor `CUDA_VISIBLE_DEVICES` is set! ' 'All {} visible GPUs will be monitored.'.format( len(self.devices))) else: raise RuntimeError('No GPU device is specified!') @staticmethod def _worker(devices, queue, event): try: with NVMLContext() as ctx: while True: event.wait() event.clear() meters = [] while not event.is_set(): time.sleep(1) meters.append([ ctx.device(k).utilization()['gpu'] for k in devices ]) meters = meters[:max(len(meters) - 1, 1)] queue.put(np.mean(meters, axis=0)) event.clear() except: queue.put(None) def _before_train(self) -> None: self.queue = mp.Queue() self.event = mp.Event() self.process = mp.Process(target=self._worker, args=(self.devices, self.queue, self.event)) ensure_proc_terminate(self.process) start_proc_mask_signal(self.process) def _before_epoch(self) -> None: while self.event.is_set(): pass self.event.set() def _after_epoch(self) -> None: while self.event.is_set(): pass self.event.set() def _trigger_epoch(self) -> None: try: meters = self.queue.get(timeout=60) except Empty: logger.exception('Error occurred in `GPUUtilizationTracker`.') return self.trainer.summary.add_scalar('utilization/gpu', np.mean(meters)) if len(self.devices) > 1: for k, device in enumerate(self.devices): self.trainer.summary.add_scalar( 'utilization/gpu{}'.format(device), meters[k]) def _after_train(self) -> None: if self.process.is_alive(): self.process.terminate()
[docs]class ThroughputTracker(Callback): """ Track the throughput within an epoch (the time of `trigger_epoch` is not included). """ master_only: bool = True def __init__(self, *, samples_per_step: Optional[int] = None) -> None: self.samples_per_step = samples_per_step def _before_train(self) -> None: self.last_step = self.trainer.global_step def _before_epoch(self) -> None: self.start_time = time.perf_counter() def _after_epoch(self) -> None: self.end_time = time.perf_counter() def _trigger_epoch(self) -> None: steps_per_sec = (self.trainer.global_step - self.last_step) / (self.end_time - self.start_time) self.last_step = self.trainer.global_step if self.samples_per_step is None: self.trainer.summary.add_scalar('throughput/steps_per_sec', steps_per_sec) else: samples_per_sec = steps_per_sec * self.samples_per_step self.trainer.summary.add_scalar('throughput/samples_per_sec', samples_per_sec)