Skip to content

aigve.metrics

This module provides the videos evaluation metrics that can be used within the AIGVE toolkit.

BlipSimScore

Bases: BaseMetric

Initialize the BLIPSimScore evaluator.

Parameters:

Name Type Description Default
model_name str

The name of the BLIP model. Defaults to Salesforce/blip-itm-base-coco.

'Salesforce/blip-itm-base-coco'
logit_scale bool

Whether to calcualte the cosine similarity as logits. Defaults to False.

False
Source code in aigve/metrics/text_video_alignment/similarity_based/blipscore/blipsim.py
@METRICS.register_module()
class BlipSimScore(BaseMetric):
    """ Initialize the ``BLIPSimScore`` evaluator.

    Args:
        model_name (str): The name of the BLIP model. Defaults to ``Salesforce/blip-itm-base-coco``.
        logit_scale (bool): Whether to calcualte the cosine similarity as logits. Defaults to False.
    """
    def __init__(self,
                 model_name: str = "Salesforce/blip-itm-base-coco",
                 logit_scale: bool = False,
                 ) -> None:
        super().__init__()
        self.model_name = model_name
        self.logit_scale = logit_scale

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = BlipForImageTextRetrieval.from_pretrained(self.model_name).to(self.device)
        self.model.eval()


# def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """BLIPSimScore process
        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_prompts, input_videos = data_samples  
        bsz = len(input_prompts)

        # Ensure prompt_input is a tensor
        if isinstance(input_prompts, tuple):
            input_prompts = list(input_prompts)

        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)


        # Initialize an empty tensor to store the concatenated features
        blip_score_sum, blip_score_cnt = 0, 0
        logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
        with torch.no_grad():
            for input_prompt, input_frames in zip(input_prompts, input_videos):
                # If frame is a tuple, extract the tensor. Assume tensor is the first element.
                # if isinstance(input_prompt_frame_pair, tuple):
                #     input_prompt_frame_pair = input_prompt_frame_pair[0]

                # for key, value in input_prompt_frame_pair.items():
                #     if isinstance(value, list):
                #         input_prompt_frame_pair[key] = value[0]

                # input_prompt_frame_pair = input_prompt_frame_pair.to("cuda")  # Add batch dimension and move the frame to the device
                # blip_cosine_sim_score = self.model(**input_prompt_frame_pair, use_itm_head=False)[0].item()
                # blip_scores.append(blip_cosine_sim_score)
                input_prompt = input_prompt.to(self.device)
                input_frames = input_frames.to(self.device)
                blip_cosine_sim_score = self.model(input_ids=input_prompt, pixel_values=input_frames, use_itm_head=False)[0].mean().item()
                blip_cosine_sim_score *= logit_scale
                print('current blip cosine similarity score', blip_cosine_sim_score)
                blip_score_sum += blip_cosine_sim_score
                blip_score_cnt += 1

        # Calculate the average BLIP score across all frames
        blip_score_frames_avg = blip_score_sum/blip_score_cnt

        result['blip_sim_score'] = blip_score_frames_avg

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        blip_score_np = np.zeros(len(results))
        for i, result in enumerate(results):
            blip_score_np[i] = result['blip_sim_score']

        blip_sim_mean = np.mean(blip_score_np) 

        print("Test results: blip similarity score={:.4f}"
              .format(blip_sim_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/similarity_based/blipscore/blipsim.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    blip_score_np = np.zeros(len(results))
    for i, result in enumerate(results):
        blip_score_np[i] = result['blip_sim_score']

    blip_sim_mean = np.mean(blip_score_np) 

    print("Test results: blip similarity score={:.4f}"
          .format(blip_sim_mean))

    return result

process(data_batch, data_samples)

BLIPSimScore process Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/similarity_based/blipscore/blipsim.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """BLIPSimScore process
    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_prompts, input_videos = data_samples  
    bsz = len(input_prompts)

    # Ensure prompt_input is a tensor
    if isinstance(input_prompts, tuple):
        input_prompts = list(input_prompts)

    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)


    # Initialize an empty tensor to store the concatenated features
    blip_score_sum, blip_score_cnt = 0, 0
    logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
    with torch.no_grad():
        for input_prompt, input_frames in zip(input_prompts, input_videos):
            # If frame is a tuple, extract the tensor. Assume tensor is the first element.
            # if isinstance(input_prompt_frame_pair, tuple):
            #     input_prompt_frame_pair = input_prompt_frame_pair[0]

            # for key, value in input_prompt_frame_pair.items():
            #     if isinstance(value, list):
            #         input_prompt_frame_pair[key] = value[0]

            # input_prompt_frame_pair = input_prompt_frame_pair.to("cuda")  # Add batch dimension and move the frame to the device
            # blip_cosine_sim_score = self.model(**input_prompt_frame_pair, use_itm_head=False)[0].item()
            # blip_scores.append(blip_cosine_sim_score)
            input_prompt = input_prompt.to(self.device)
            input_frames = input_frames.to(self.device)
            blip_cosine_sim_score = self.model(input_ids=input_prompt, pixel_values=input_frames, use_itm_head=False)[0].mean().item()
            blip_cosine_sim_score *= logit_scale
            print('current blip cosine similarity score', blip_cosine_sim_score)
            blip_score_sum += blip_cosine_sim_score
            blip_score_cnt += 1

    # Calculate the average BLIP score across all frames
    blip_score_frames_avg = blip_score_sum/blip_score_cnt

    result['blip_sim_score'] = blip_score_frames_avg

    self.results.append(result)

CLIPSimScore

Bases: BaseMetric

Initialize the CLIPSimScore evaluator.

Parameters:

Name Type Description Default
processor_name str

The name of the CLIP processor, which wraps a CLIP feature extractor and a CLIP tokenizer into this single procesor. Defaults to openai/clip-vit-base-patch32.

'openai/clip-vit-base-patch32'
model_name str

The name of the CLIP model. Defaults to openai/clip-vit-base-patch32.

'openai/clip-vit-base-patch32'
logit_scale bool

Whether to calcualte the cosine similarity as logits. Defaults to False.

False
Source code in aigve/metrics/text_video_alignment/similarity_based/clipscore/clipsim.py
@METRICS.register_module()
class CLIPSimScore(BaseMetric):
    """ Initialize the ``CLIPSimScore`` evaluator.

    Args:
        processor_name (str): The name of the CLIP processor, which wraps a CLIP feature extractor and a CLIP tokenizer into this single procesor. 
                                Defaults to ``openai/clip-vit-base-patch32``.
        model_name (str): The name of the CLIP model. Defaults to ``openai/clip-vit-base-patch32``.
        logit_scale (bool): Whether to calcualte the cosine similarity as logits. Defaults to False.
    """
    def __init__(self,
                 processor_name: str = "openai/clip-vit-base-patch32",
                 model_name: str = "openai/clip-vit-base-patch32",
                 logit_scale: bool = False,
                #  train_index: int = 4
                 ) -> None:
        super().__init__()
        self.processor_name = processor_name
        self.model_name = model_name
        self.logit_scale = logit_scale

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.processor = AutoProcessor.from_pretrained(self.processor_name)
        self.model = CLIPModel.from_pretrained(self.model_name).to(self.device)
        self.model.eval()

    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """CLIPSimScore process
        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_prompts, input_videos = data_samples
        bsz = len(input_prompts)

        # Ensure prompt_input is a tensor
        if isinstance(input_prompts, tuple):
            input_prompts = list(input_prompts)

        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)

        # Initialize an empty list to store each similarity score
        clip_score_sum, clip_score_cnt = 0, 0
        logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
        with torch.no_grad():
            for input_prompt, input_frames in zip(input_prompts, input_videos):
                input_prompt = input_prompt.to(self.device)
                text_feature = self.model.get_text_features(input_prompt) # [bsz, hid_dim]
                text_feature = text_feature / torch.norm(text_feature, dim=-1, keepdim=True)

                input_frames = input_frames.to(self.device)  # Add batch dimension and move the frame to the device
                frame_feature = self.model.get_image_features(input_frames)
                frame_feature = frame_feature / torch.norm(frame_feature, dim=-1, keepdim=True)

                clip_score = logit_scale * (frame_feature @ text_feature.T).mean().item()
                print('current clip similarity score', clip_score)
                clip_score_sum += clip_score
                clip_score_cnt += 1

        # Calculate the average CLIP score across all frames
        clip_score_videos_avg = clip_score_sum/clip_score_cnt

        result['clip_sim_score'] = clip_score_videos_avg

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        clip_score_np = np.zeros(len(results))
        for i, result in enumerate(results):
            clip_score_np[i] = result['clip_sim_score']

        clip_sim_mean = np.mean(clip_score_np) 

        print("Test results: clip similarity score={:.4f}"
              .format(clip_sim_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/similarity_based/clipscore/clipsim.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    clip_score_np = np.zeros(len(results))
    for i, result in enumerate(results):
        clip_score_np[i] = result['clip_sim_score']

    clip_sim_mean = np.mean(clip_score_np) 

    print("Test results: clip similarity score={:.4f}"
          .format(clip_sim_mean))

    return result

process(data_batch, data_samples)

CLIPSimScore process Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/similarity_based/clipscore/clipsim.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """CLIPSimScore process
    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_prompts, input_videos = data_samples
    bsz = len(input_prompts)

    # Ensure prompt_input is a tensor
    if isinstance(input_prompts, tuple):
        input_prompts = list(input_prompts)

    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)

    # Initialize an empty list to store each similarity score
    clip_score_sum, clip_score_cnt = 0, 0
    logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
    with torch.no_grad():
        for input_prompt, input_frames in zip(input_prompts, input_videos):
            input_prompt = input_prompt.to(self.device)
            text_feature = self.model.get_text_features(input_prompt) # [bsz, hid_dim]
            text_feature = text_feature / torch.norm(text_feature, dim=-1, keepdim=True)

            input_frames = input_frames.to(self.device)  # Add batch dimension and move the frame to the device
            frame_feature = self.model.get_image_features(input_frames)
            frame_feature = frame_feature / torch.norm(frame_feature, dim=-1, keepdim=True)

            clip_score = logit_scale * (frame_feature @ text_feature.T).mean().item()
            print('current clip similarity score', clip_score)
            clip_score_sum += clip_score
            clip_score_cnt += 1

    # Calculate the average CLIP score across all frames
    clip_score_videos_avg = clip_score_sum/clip_score_cnt

    result['clip_sim_score'] = clip_score_videos_avg

    self.results.append(result)

CLIPTempScore

Bases: BaseMetric

Initialize the CLIPTempScore evaluator.

Parameters:

Name Type Description Default
model_name str

The name of the CLIP encoder model. Defaults to openai/clip-vit-base-patch32.

'openai/clip-vit-base-patch32'
logit_scale bool

Whether to calcualte the cosine similarity as logits. Defaults to False.

False
Source code in aigve/metrics/text_video_alignment/similarity_based/clipscore/cliptemp.py
@METRICS.register_module()
class CLIPTempScore(BaseMetric):
    """ Initialize the ``CLIPTempScore`` evaluator.

    Args:
        model_name (str): The name of the CLIP encoder model. Defaults to ``openai/clip-vit-base-patch32``.
        logit_scale (bool): Whether to calcualte the cosine similarity as logits. Defaults to False.

    """
    def __init__(self,
                 model_name: str = "openai/clip-vit-base-patch32",
                 logit_scale: bool = False,
                #  train_index: int = 4
                 ) -> None:
        super().__init__()
        self.model_name = model_name
        self.logit_scale = logit_scale

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = CLIPModel.from_pretrained(self.model_name).to(self.device)
        self.model.eval()

    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """CLIPTempScore process
        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_videos = data_samples
        # bsz = len(input_videos)


        # Ensure prompt_input is a tensor        
        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)

        # Generate embeddings for each frame and concatenate the features
        clip_temp_score_sum, clip_temp_score_cnt = 0, 0
        logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
        with torch.no_grad():  
            for input_frames in input_videos: # Too many frames in a video, must split before CLIP embedding, limited by the memory
                input_frames = input_frames.to(self.device)
                frame_feature = self.model.get_image_features(input_frames)
                frame_feature = frame_feature / torch.norm(frame_feature, dim=-1, keepdim=True)
                # print(frame_feature.shape)

                clip_temp_score_list = []
                for i in range(frame_feature.shape[0]-1):
                    clip_temp_score = logit_scale * frame_feature[i].unsqueeze(0) @ frame_feature[i+1].unsqueeze(0).T
                    clip_temp_score = clip_temp_score.item()
                    # print(clip_temp_score)
                    clip_temp_score_list.append(clip_temp_score)
                clip_temp_cur_avg_score = sum(clip_temp_score_list)/len(clip_temp_score_list)
                clip_temp_score_sum += clip_temp_cur_avg_score
                clip_temp_score_cnt += 1
                print('current clip temp similarity score', clip_temp_cur_avg_score)

        clip_temp_score_avg = clip_temp_score_sum/clip_temp_score_cnt

        result['clip_temp_score'] = clip_temp_score_avg

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        clip_score_np = np.zeros(len(results))
        for i, result in enumerate(results):
            clip_score_np[i] = result['clip_temp_score']

        clip_temp_mean = np.mean(clip_score_np) 

        print("Test results: clip temporal consistency score={:.4f}"
              .format(clip_temp_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/similarity_based/clipscore/cliptemp.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    clip_score_np = np.zeros(len(results))
    for i, result in enumerate(results):
        clip_score_np[i] = result['clip_temp_score']

    clip_temp_mean = np.mean(clip_score_np) 

    print("Test results: clip temporal consistency score={:.4f}"
          .format(clip_temp_mean))

    return result

process(data_batch, data_samples)

CLIPTempScore process Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/similarity_based/clipscore/cliptemp.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """CLIPTempScore process
    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_videos = data_samples
    # bsz = len(input_videos)


    # Ensure prompt_input is a tensor        
    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)

    # Generate embeddings for each frame and concatenate the features
    clip_temp_score_sum, clip_temp_score_cnt = 0, 0
    logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
    with torch.no_grad():  
        for input_frames in input_videos: # Too many frames in a video, must split before CLIP embedding, limited by the memory
            input_frames = input_frames.to(self.device)
            frame_feature = self.model.get_image_features(input_frames)
            frame_feature = frame_feature / torch.norm(frame_feature, dim=-1, keepdim=True)
            # print(frame_feature.shape)

            clip_temp_score_list = []
            for i in range(frame_feature.shape[0]-1):
                clip_temp_score = logit_scale * frame_feature[i].unsqueeze(0) @ frame_feature[i+1].unsqueeze(0).T
                clip_temp_score = clip_temp_score.item()
                # print(clip_temp_score)
                clip_temp_score_list.append(clip_temp_score)
            clip_temp_cur_avg_score = sum(clip_temp_score_list)/len(clip_temp_score_list)
            clip_temp_score_sum += clip_temp_cur_avg_score
            clip_temp_score_cnt += 1
            print('current clip temp similarity score', clip_temp_cur_avg_score)

    clip_temp_score_avg = clip_temp_score_sum/clip_temp_score_cnt

    result['clip_temp_score'] = clip_temp_score_avg

    self.results.append(result)

DSGScore

Bases: BaseMetric

Initialize the DSGScore evaluator.

Parameters:

Name Type Description Default
vqa_model_name str

The name of the VQA model used in the DSGScore evaluator. Defaults to InstructBLIP, you can also choose the "MPLUG" as the VQA model.

'InstructBLIP'
verbose bool

Whether the intermediate output processes is required. Defaults to False.

False
Source code in aigve/metrics/text_video_alignment/gpt_based/dsg/dsg_eval.py
@METRICS.register_module()
class DSGScore(BaseMetric):
    """ Initialize the ``DSGScore`` evaluator.

    Args:
        vqa_model_name (str): The name of the VQA model used in the DSGScore evaluator. Defaults to ``InstructBLIP``, you can also choose the "MPLUG" as the VQA model.
        verbose (bool): Whether the intermediate output processes is required. Defaults to False.
    """
    def __init__(self, 
                 vqa_model_name: str = "InstructBLIP",
                 verbose: bool = False):
        super().__init__()

        self.submodel_path = 'metrics/text_video_alignment/gpt_based/dsg'
        if not submodule_exists(self.submodel_path):
            add_git_submodule(
                repo_url='https://github.com/j-min/DSG.git', 
                submodule_path=self.submodel_path
            )     
        from .DSG.dsg.vqa_utils import MPLUG, InstructBLIP

        self.vqa_model_name = vqa_model_name
        assert self.vqa_model_name in ["InstructBLIP", "MPLUG"]
        if self.vqa_model_name == 'InstructBLIP':
            self.vqa_model = InstructBLIP()
        else:
            self.vqa_model = MPLUG()

        self.verbose = verbose
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


    def evaluate_image_dsg(self, qid_list, frame_index, frame) -> Dict[str, Union[int, dict, float]]:
        """ Evaluate a generated image with DSG evaluator; this is the intermediate process of the ``process`` function. 

        Args:
            qid_list (List[str]): The list of DSG parse question generation results.
            frame_index (int): The index number of the currently evaluated frame.
            frame (List[List[float]]): The current evaluated frame.

        Returns:
            Dict[str, Union[int, dict, float]]: A dictionary containing evaluation results with the following keys:
                - 'frame_index' (int): The index of the evaluated frame.
                - 'qid2tuple' (dict): Mapping of question IDs to tuples.
                - 'qid2dependency' (dict): Mapping of question IDs to dependencies.
                - 'qid2question' (dict): Mapping of question IDs to actual questions.
                - 'qid2answer' (dict): Mapping of question IDs to predicted answers.
                - 'qid2scores' (dict): Mapping of question IDs to scores before dependency filtering.
                - 'qid2validity' (dict): Mapping of question IDs to boolean validity after dependency filtering.
                - 'average_score_with_dependency' (float): Average score considering dependency filtering.
                - 'average_score_without_dependency' (float): Average score before dependency filtering.
        """
        if self.verbose:
            print("#"*50)
            print("2) Answer questions given the generated image, with VQA")
            print("#"*50)

        # 2) answer questions with the generated image
        qid2answer = {}
        qid2scores = {}

        qid2tuple, qid2dependency, qid2question = qid_list
        for id, question in qid2question.items():
            answer = self.vqa_model.vqa(image=frame, question=question)
            print(answer)
            qid2answer[id] = answer
            qid2scores[id] = float('yes' in answer)

        average_score_without_dep = sum(qid2scores.values()) / len(qid2scores)
        print(average_score_without_dep, qid2answer, qid2scores)

        if self.verbose:
            print("#"*50)
            print("3) Zero-out scores from invalid questions")
            print("#"*50)

        # 3) zero-out scores from invalid questions 
        qid2validity = {}
        qid2scores_after_filtering = deepcopy(qid2scores)

        # print('qid2scores', qid2scores)
        # print('qid2dependency', qid2dependency)
        for id, parent_ids in qid2dependency.items():
            # zero-out scores if parent questions are answered 'no'
            any_parent_answered_no = False
            for parent_id in parent_ids:
                parent_id = list(parent_id)[0]
                if parent_id == 0:
                    continue
                if qid2scores[parent_id] == 0:
                    any_parent_answered_no = True
                    break
            if any_parent_answered_no:
                qid2scores_after_filtering[id] = 0.0
                qid2validity[id] = False
            else:
                qid2validity[id] = True

        if self.verbose:
            print("Per-quesiton eval results (after using dependency)")
            for id in qid2question:
                print("ID", id)
                print("question", qid2question[id])
                print("answer", qid2answer[id])
                print("validity", qid2validity[id])
                print("score (before filtering)", qid2scores[id])
                print("score (after filtering)", qid2scores_after_filtering[id])
                print()

        if self.verbose:
            print("#"*50)
            print("4) Calculate the final score by averaging")
            print("#"*50)

        average_score_with_dep = sum(qid2scores_after_filtering.values()) / len(qid2scores)

        return {
            'frame_index': frame_index,
            'qid2tuple': qid2tuple,
            'qid2dependency': qid2dependency,
            'qid2question': qid2question,
            'qid2answer': qid2answer,
            'qid2scores': qid2scores,
            'qid2validity': qid2validity,
            'average_score_with_dependency': average_score_with_dep,
            'average_score_without_dependency': average_score_without_dep
        }


    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """DSGScore process

        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_qid_lists, input_videos = data_samples
        bsz = len(input_qid_lists)
        # print('input_qid_lists: ', input_qid_lists)

        # Ensure prompt_input is a tensor
        if isinstance(input_qid_lists, tuple):
            input_qid_lists = list(input_qid_lists)

        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)

        average_dep_score_list, average_wo_dep_score_list = [], []
        for input_qid_list, input_video in zip([input_qid_lists], input_videos):
            evaluate_dict_list = []
            dep_score, wo_dep_score = [], []
            for index, frame in enumerate(input_video):
                # print('input_qid_list: ', input_qid_list)
                evaluate_dict = self.evaluate_image_dsg(qid_list=input_qid_list, 
                                                        frame_index=index, 
                                                        frame=frame)
                evaluate_dict_list.append(evaluate_dict)
                frame_average_score_with_dependency = evaluate_dict['average_score_with_dependency']
                dep_score.append(frame_average_score_with_dependency)
                frame_average_score_without_dependency = evaluate_dict['average_score_without_dependency']
                wo_dep_score.append(frame_average_score_without_dependency)
            avg_dep_score, avg_wo_dep_score = sum(dep_score)/len(dep_score), sum(wo_dep_score)/len(dep_score)
            average_dep_score_list.append(avg_dep_score)
            average_wo_dep_score_list.append(avg_wo_dep_score)


        result['average_dep_dgs_score'] = sum(average_dep_score_list)/len(average_dep_score_list)
        result['average_wo_dep_dgs_score'] = sum(average_wo_dep_score_list)/len(average_wo_dep_score_list)

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        dep_dsg_score_np = np.zeros(len(results))
        wo_dep_dsg_score_np = np.zeros(len(results))
        for i, result in enumerate(results):
            dep_dsg_score_np[i] = result['average_dep_dgs_score']
            wo_dep_dsg_score_np[i] = result['average_wo_dep_dgs_score']

        dep_dsg_score_np_mean = np.mean(dep_dsg_score_np) 
        wo_dep_dsg_score_np_mean = np.mean(wo_dep_dsg_score_np)

        print("Test results: dsg score with dependency={:.4f}"
              .format(dep_dsg_score_np_mean))
        print("Test results: dsg score without dependency={:.4f}"
              .format(wo_dep_dsg_score_np_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/gpt_based/dsg/dsg_eval.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    dep_dsg_score_np = np.zeros(len(results))
    wo_dep_dsg_score_np = np.zeros(len(results))
    for i, result in enumerate(results):
        dep_dsg_score_np[i] = result['average_dep_dgs_score']
        wo_dep_dsg_score_np[i] = result['average_wo_dep_dgs_score']

    dep_dsg_score_np_mean = np.mean(dep_dsg_score_np) 
    wo_dep_dsg_score_np_mean = np.mean(wo_dep_dsg_score_np)

    print("Test results: dsg score with dependency={:.4f}"
          .format(dep_dsg_score_np_mean))
    print("Test results: dsg score without dependency={:.4f}"
          .format(wo_dep_dsg_score_np_mean))

    return result

evaluate_image_dsg(qid_list, frame_index, frame)

Evaluate a generated image with DSG evaluator; this is the intermediate process of the process function.

Parameters:

Name Type Description Default
qid_list List[str]

The list of DSG parse question generation results.

required
frame_index int

The index number of the currently evaluated frame.

required
frame List[List[float]]

The current evaluated frame.

required

Returns:

Type Description
Dict[str, Union[int, dict, float]]

Dict[str, Union[int, dict, float]]: A dictionary containing evaluation results with the following keys: - 'frame_index' (int): The index of the evaluated frame. - 'qid2tuple' (dict): Mapping of question IDs to tuples. - 'qid2dependency' (dict): Mapping of question IDs to dependencies. - 'qid2question' (dict): Mapping of question IDs to actual questions. - 'qid2answer' (dict): Mapping of question IDs to predicted answers. - 'qid2scores' (dict): Mapping of question IDs to scores before dependency filtering. - 'qid2validity' (dict): Mapping of question IDs to boolean validity after dependency filtering. - 'average_score_with_dependency' (float): Average score considering dependency filtering. - 'average_score_without_dependency' (float): Average score before dependency filtering.

Source code in aigve/metrics/text_video_alignment/gpt_based/dsg/dsg_eval.py
def evaluate_image_dsg(self, qid_list, frame_index, frame) -> Dict[str, Union[int, dict, float]]:
    """ Evaluate a generated image with DSG evaluator; this is the intermediate process of the ``process`` function. 

    Args:
        qid_list (List[str]): The list of DSG parse question generation results.
        frame_index (int): The index number of the currently evaluated frame.
        frame (List[List[float]]): The current evaluated frame.

    Returns:
        Dict[str, Union[int, dict, float]]: A dictionary containing evaluation results with the following keys:
            - 'frame_index' (int): The index of the evaluated frame.
            - 'qid2tuple' (dict): Mapping of question IDs to tuples.
            - 'qid2dependency' (dict): Mapping of question IDs to dependencies.
            - 'qid2question' (dict): Mapping of question IDs to actual questions.
            - 'qid2answer' (dict): Mapping of question IDs to predicted answers.
            - 'qid2scores' (dict): Mapping of question IDs to scores before dependency filtering.
            - 'qid2validity' (dict): Mapping of question IDs to boolean validity after dependency filtering.
            - 'average_score_with_dependency' (float): Average score considering dependency filtering.
            - 'average_score_without_dependency' (float): Average score before dependency filtering.
    """
    if self.verbose:
        print("#"*50)
        print("2) Answer questions given the generated image, with VQA")
        print("#"*50)

    # 2) answer questions with the generated image
    qid2answer = {}
    qid2scores = {}

    qid2tuple, qid2dependency, qid2question = qid_list
    for id, question in qid2question.items():
        answer = self.vqa_model.vqa(image=frame, question=question)
        print(answer)
        qid2answer[id] = answer
        qid2scores[id] = float('yes' in answer)

    average_score_without_dep = sum(qid2scores.values()) / len(qid2scores)
    print(average_score_without_dep, qid2answer, qid2scores)

    if self.verbose:
        print("#"*50)
        print("3) Zero-out scores from invalid questions")
        print("#"*50)

    # 3) zero-out scores from invalid questions 
    qid2validity = {}
    qid2scores_after_filtering = deepcopy(qid2scores)

    # print('qid2scores', qid2scores)
    # print('qid2dependency', qid2dependency)
    for id, parent_ids in qid2dependency.items():
        # zero-out scores if parent questions are answered 'no'
        any_parent_answered_no = False
        for parent_id in parent_ids:
            parent_id = list(parent_id)[0]
            if parent_id == 0:
                continue
            if qid2scores[parent_id] == 0:
                any_parent_answered_no = True
                break
        if any_parent_answered_no:
            qid2scores_after_filtering[id] = 0.0
            qid2validity[id] = False
        else:
            qid2validity[id] = True

    if self.verbose:
        print("Per-quesiton eval results (after using dependency)")
        for id in qid2question:
            print("ID", id)
            print("question", qid2question[id])
            print("answer", qid2answer[id])
            print("validity", qid2validity[id])
            print("score (before filtering)", qid2scores[id])
            print("score (after filtering)", qid2scores_after_filtering[id])
            print()

    if self.verbose:
        print("#"*50)
        print("4) Calculate the final score by averaging")
        print("#"*50)

    average_score_with_dep = sum(qid2scores_after_filtering.values()) / len(qid2scores)

    return {
        'frame_index': frame_index,
        'qid2tuple': qid2tuple,
        'qid2dependency': qid2dependency,
        'qid2question': qid2question,
        'qid2answer': qid2answer,
        'qid2scores': qid2scores,
        'qid2validity': qid2validity,
        'average_score_with_dependency': average_score_with_dep,
        'average_score_without_dependency': average_score_without_dep
    }

process(data_batch, data_samples)

DSGScore process

Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/gpt_based/dsg/dsg_eval.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """DSGScore process

    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_qid_lists, input_videos = data_samples
    bsz = len(input_qid_lists)
    # print('input_qid_lists: ', input_qid_lists)

    # Ensure prompt_input is a tensor
    if isinstance(input_qid_lists, tuple):
        input_qid_lists = list(input_qid_lists)

    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)

    average_dep_score_list, average_wo_dep_score_list = [], []
    for input_qid_list, input_video in zip([input_qid_lists], input_videos):
        evaluate_dict_list = []
        dep_score, wo_dep_score = [], []
        for index, frame in enumerate(input_video):
            # print('input_qid_list: ', input_qid_list)
            evaluate_dict = self.evaluate_image_dsg(qid_list=input_qid_list, 
                                                    frame_index=index, 
                                                    frame=frame)
            evaluate_dict_list.append(evaluate_dict)
            frame_average_score_with_dependency = evaluate_dict['average_score_with_dependency']
            dep_score.append(frame_average_score_with_dependency)
            frame_average_score_without_dependency = evaluate_dict['average_score_without_dependency']
            wo_dep_score.append(frame_average_score_without_dependency)
        avg_dep_score, avg_wo_dep_score = sum(dep_score)/len(dep_score), sum(wo_dep_score)/len(dep_score)
        average_dep_score_list.append(avg_dep_score)
        average_wo_dep_score_list.append(avg_wo_dep_score)


    result['average_dep_dgs_score'] = sum(average_dep_score_list)/len(average_dep_score_list)
    result['average_wo_dep_dgs_score'] = sum(average_wo_dep_score_list)/len(average_wo_dep_score_list)

    self.results.append(result)

FIDScore

Bases: BaseMetric

Source code in aigve/metrics/video_quality_assessment/distribution_based/fid_metric.py
@METRICS.register_module()
class FIDScore(BaseMetric):

    def __init__(self, 
                 model_name: str = 'inception_v3', 
                 input_shape: tuple = (299, 299, 3), 
                 is_gpu: str = True):
        super(FIDScore, self).__init__()
        self.device = torch.device("cuda" if is_gpu else "cpu")
        self.model_name = model_name
        self.input_shape = input_shape
        if self.model_name == "inception_v3":
            self.model = models.inception_v3(pretrained=True, transform_input=False)
            self.model.fc = nn.Identity()  # Remove classification head
            self.model.eval().to(self.device)
        else:
            raise ValueError(f"Model '{self.model_name}' is not supported for FID computation.")

        # Define preprocessing for InceptionV3
        self.transform = transforms.Compose([
            transforms.Resize((self.input_shape[0], self.input_shape[1])),  # InceptionV3 input size
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize to [-1, 1]
        ])

    def preprocess_tensor(self, video_tensor: torch.Tensor) -> torch.Tensor:
        """
        Resize and normalize a video tensor.

        Args:
            video_tensor (torch.Tensor): Tensor of shape [T, C, H, W].

        Returns:
            torch.Tensor: Preprocessed tensor of shape [T, C, H, W].
        """
        video_tensor = self.transform(video_tensor / 255.0)
        return video_tensor

    def calculate_statistics(self, video_tensor: torch.Tensor) -> tuple[np.ndarray, np.ndarray]:
        """
        Calculate activation statistics (mean and covariance) from video frames.

        Args:
            video_tensor (torch.Tensor): Video tensor [T, C, H, W].

        Returns:
            Tuple of mean and covariance matrix.
        """
        video_tensor = self.preprocess_tensor(video_tensor).to(self.device)
        with torch.no_grad():
            features = self.model(video_tensor).cpu().numpy()  # Extract 2048-d feature vectors

        mu = features.mean(axis=0)
        sigma = np.cov(features, rowvar=False)
        return mu, sigma

    def calculate_fid(self, real: torch.Tensor, fake: torch.Tensor) -> float:
        """
        Calculate FID score between real and generated videos.

        Args:
            real (torch.Tensor): Real video tensor [T, C, H, W].
            fake (torch.Tensor): Generated video tensor [T, C, H, W].

        Returns:
            float: FID score.
        """
        mu1, sigma1 = self.calculate_statistics(real) # Shape[2048], Shape[2048, 2048]
        mu2, sigma2 = self.calculate_statistics(fake)

        # Compute FID score
        ssdiff = np.sum((mu1 - mu2) ** 2.0)
        covmean = sqrtm(sigma1 @ sigma2)

        # Check and correct for imaginary numbers
        if np.iscomplexobj(covmean):
            covmean = covmean.real

        fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
        return fid


    def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
        """
        Process one batch of data samples and compute FID.

        Args:
            data_batch (dict): A batch of data from the dataloader (not used here).
            data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str], Tuple[str]]):
                A list containing four tuples:
                - A tuple of `real_tensor` (torch.Tensor): Real video tensor [T, C, H, W].
                - A tuple of `gen_tensor` (torch.Tensor): Generated video tensor [T, C, H, W].
                - A tuple of `real_video_name` (str): Ground-truth video filename.
                - A tuple of `gen_video_name` (str): Generated video filename.
                The len of each tuples are the batch size.
        """
        results = []
        real_tensor_tuple, gen_tensor_tuple, real_video_name_tuple, gen_video_name_tuple = data_samples

        batch_size = len(real_tensor_tuple)
        with torch.no_grad():
            for i in range(batch_size):
                real_video_name = real_video_name_tuple[i]
                gen_video_name = gen_video_name_tuple[i]
                real_tensor = real_tensor_tuple[i]
                gen_tensor = gen_tensor_tuple[i]
                fid_score = self.calculate_fid(real_tensor, gen_tensor)

                results.append({
                    "Real video_name": real_video_name, 
                    "Generated video_name": gen_video_name, 
                    "FID_Score": fid_score
                })
                print(f"Processed score {fid_score:.4f} between {real_video_name} and {gen_video_name}")

        self.results.extend(results)

    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the final FID score."""
        scores = np.array([res["FID_Score"] for res in self.results])
        mean_score = np.mean(scores) if scores.size > 0 else 0.0
        print(f"FID mean score: {mean_score:.4f}")

        json_file_path = os.path.join(os.getcwd(), "fid_results.json")
        final_results = {
            "video_results": self.results, 
            "FID_Mean_Score": mean_score
        }
        with open(json_file_path, "w") as json_file:
            json.dump(final_results, json_file, indent=4)
        print(f"FID mean score saved to {json_file_path}")

        return {'FID_Mean_Score': mean_score}

calculate_fid(real, fake)

Calculate FID score between real and generated videos.

Parameters:

Name Type Description Default
real Tensor

Real video tensor [T, C, H, W].

required
fake Tensor

Generated video tensor [T, C, H, W].

required

Returns:

Name Type Description
float float

FID score.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fid_metric.py
def calculate_fid(self, real: torch.Tensor, fake: torch.Tensor) -> float:
    """
    Calculate FID score between real and generated videos.

    Args:
        real (torch.Tensor): Real video tensor [T, C, H, W].
        fake (torch.Tensor): Generated video tensor [T, C, H, W].

    Returns:
        float: FID score.
    """
    mu1, sigma1 = self.calculate_statistics(real) # Shape[2048], Shape[2048, 2048]
    mu2, sigma2 = self.calculate_statistics(fake)

    # Compute FID score
    ssdiff = np.sum((mu1 - mu2) ** 2.0)
    covmean = sqrtm(sigma1 @ sigma2)

    # Check and correct for imaginary numbers
    if np.iscomplexobj(covmean):
        covmean = covmean.real

    fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
    return fid

calculate_statistics(video_tensor)

Calculate activation statistics (mean and covariance) from video frames.

Parameters:

Name Type Description Default
video_tensor Tensor

Video tensor [T, C, H, W].

required

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple of mean and covariance matrix.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fid_metric.py
def calculate_statistics(self, video_tensor: torch.Tensor) -> tuple[np.ndarray, np.ndarray]:
    """
    Calculate activation statistics (mean and covariance) from video frames.

    Args:
        video_tensor (torch.Tensor): Video tensor [T, C, H, W].

    Returns:
        Tuple of mean and covariance matrix.
    """
    video_tensor = self.preprocess_tensor(video_tensor).to(self.device)
    with torch.no_grad():
        features = self.model(video_tensor).cpu().numpy()  # Extract 2048-d feature vectors

    mu = features.mean(axis=0)
    sigma = np.cov(features, rowvar=False)
    return mu, sigma

compute_metrics(results)

Compute the final FID score.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fid_metric.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the final FID score."""
    scores = np.array([res["FID_Score"] for res in self.results])
    mean_score = np.mean(scores) if scores.size > 0 else 0.0
    print(f"FID mean score: {mean_score:.4f}")

    json_file_path = os.path.join(os.getcwd(), "fid_results.json")
    final_results = {
        "video_results": self.results, 
        "FID_Mean_Score": mean_score
    }
    with open(json_file_path, "w") as json_file:
        json.dump(final_results, json_file, indent=4)
    print(f"FID mean score saved to {json_file_path}")

    return {'FID_Mean_Score': mean_score}

preprocess_tensor(video_tensor)

Resize and normalize a video tensor.

Parameters:

Name Type Description Default
video_tensor Tensor

Tensor of shape [T, C, H, W].

required

Returns:

Type Description
Tensor

torch.Tensor: Preprocessed tensor of shape [T, C, H, W].

Source code in aigve/metrics/video_quality_assessment/distribution_based/fid_metric.py
def preprocess_tensor(self, video_tensor: torch.Tensor) -> torch.Tensor:
    """
    Resize and normalize a video tensor.

    Args:
        video_tensor (torch.Tensor): Tensor of shape [T, C, H, W].

    Returns:
        torch.Tensor: Preprocessed tensor of shape [T, C, H, W].
    """
    video_tensor = self.transform(video_tensor / 255.0)
    return video_tensor

process(data_batch, data_samples)

Process one batch of data samples and compute FID.

Parameters:

Name Type Description Default
data_batch dict

A batch of data from the dataloader (not used here).

required
data_samples List[Tuple[Tensor], Tuple[Tensor], Tuple[str], Tuple[str]]

A list containing four tuples: - A tuple of real_tensor (torch.Tensor): Real video tensor [T, C, H, W]. - A tuple of gen_tensor (torch.Tensor): Generated video tensor [T, C, H, W]. - A tuple of real_video_name (str): Ground-truth video filename. - A tuple of gen_video_name (str): Generated video filename. The len of each tuples are the batch size.

required
Source code in aigve/metrics/video_quality_assessment/distribution_based/fid_metric.py
def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
    """
    Process one batch of data samples and compute FID.

    Args:
        data_batch (dict): A batch of data from the dataloader (not used here).
        data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str], Tuple[str]]):
            A list containing four tuples:
            - A tuple of `real_tensor` (torch.Tensor): Real video tensor [T, C, H, W].
            - A tuple of `gen_tensor` (torch.Tensor): Generated video tensor [T, C, H, W].
            - A tuple of `real_video_name` (str): Ground-truth video filename.
            - A tuple of `gen_video_name` (str): Generated video filename.
            The len of each tuples are the batch size.
    """
    results = []
    real_tensor_tuple, gen_tensor_tuple, real_video_name_tuple, gen_video_name_tuple = data_samples

    batch_size = len(real_tensor_tuple)
    with torch.no_grad():
        for i in range(batch_size):
            real_video_name = real_video_name_tuple[i]
            gen_video_name = gen_video_name_tuple[i]
            real_tensor = real_tensor_tuple[i]
            gen_tensor = gen_tensor_tuple[i]
            fid_score = self.calculate_fid(real_tensor, gen_tensor)

            results.append({
                "Real video_name": real_video_name, 
                "Generated video_name": gen_video_name, 
                "FID_Score": fid_score
            })
            print(f"Processed score {fid_score:.4f} between {real_video_name} and {gen_video_name}")

    self.results.extend(results)

FVDScore

Bases: BaseMetric

Fréchet Video Distance (FVD) computation using I3D model. Users can first download the pretrained I3D model from: https://github.com/hassony2/kinetics_i3d_pytorch/blob/master/model/model_rgb.pth Then put in the folder: AIGVE_Tool/aigve/metrics/video_quality_assessment/distribution_based/fvd/

Parameters:

Name Type Description Default
model_path str

Path to pre-trained I3D model.

required
feature_layer int

Layer to extract features from. Default is -2 (penultimate layer).

-2
is_gpu bool

Whether to use GPU. Default is True.

True
Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
@METRICS.register_module()
class FVDScore(BaseMetric):
    """
    Fréchet Video Distance (FVD) computation using I3D model.
    Users can first download the pretrained I3D model from: 
    https://github.com/hassony2/kinetics_i3d_pytorch/blob/master/model/model_rgb.pth
    Then put in the folder: 
    AIGVE_Tool/aigve/metrics/video_quality_assessment/distribution_based/fvd/

    Args:
        model_path (str): Path to pre-trained I3D model.
        feature_layer (int): Layer to extract features from. Default is -2 (penultimate layer).
        is_gpu (bool): Whether to use GPU. Default is True.
    """
    def __init__(self, 
                 model_path: str, 
                 feature_layer: int = -2, 
                 is_gpu: bool = True):
        super(FVDScore, self).__init__()
        self.device = torch.device("cuda" if is_gpu and torch.cuda.is_available() else "cpu")
        self.model = self.load_i3d_model(model_path, feature_layer)
        self.model.eval()

        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),  # I3D input size
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize to [-1, 1]
        ])

    def load_i3d_model(self, model_path: str, feature_layer: int) -> torch.nn.Module:
        """
        Load a pre-trained I3D model and modify it to extract features.

        Args:
            model_path (str): Path to the I3D model checkpoint.
            feature_layer (int): The layer index from which to extract features.

        Returns:
            torch.nn.Module: I3D feature extraction model.
        """
        model = models.video.r3d_18(pretrained=True)  # Using ResNet3D as an I3D alternative
        model.fc = nn.Identity()  # Remove classification head

        if os.path.exists(model_path):
            model.load_state_dict(torch.load(model_path, map_location=self.device))
        else:
            print(f"Warning: Model checkpoint not found at {model_path}, using default weights.")

        return model

    def preprocess_tensor(self, video_tensor: torch.Tensor) -> torch.Tensor:
        """
        Resize and normalize a video tensor.

        Args:
            video_tensor (torch.Tensor): Tensor of shape [T, C, H, W].

        Returns:
            torch.Tensor: Preprocessed tensor of shape [T, C, H, W].
        """
        return self.transform(video_tensor / 255.0)

    def calculate_statistics(self, video_tensor: torch.Tensor) -> tuple[np.ndarray, np.ndarray]:
        """
        Extract activation statistics from video frames.

        Args:
            video_tensor (torch.Tensor): Video tensor [T, C, H, W].

        Returns:
            Tuple[np.ndarray, np.ndarray]: Mean and covariance of extracted features.
        """
        video_tensor = self.preprocess_tensor(video_tensor).to(self.device)
        self.model.to(self.device)
        # Permute to match I3D input format [B, C, T, H, W]
        video_tensor = video_tensor.permute(1, 0, 2, 3).unsqueeze(0)  # Shape: [1, 3, T, H, W]
        with torch.no_grad():
            features = self.model(video_tensor).cpu().numpy()

        # print('features: ', features.shape)
        mu = features.mean(axis=0)
        # Ensure at least 2 samples to compute covariance
        if features.shape[0] > 1:
            sigma = np.cov(features, rowvar=False)
        else:
            sigma = np.zeros((features.shape[1], features.shape[1])) # Identity fallback
        return mu, sigma

    def calculate_fvd(self, real: torch.Tensor, fake: torch.Tensor) -> float:
        """
        Compute FVD score between real and generated videos.

        Args:
            real (torch.Tensor): Real video tensor [T, C, H, W].
            fake (torch.Tensor): Generated video tensor [T, C, H, W].

        Returns:
            float: FVD score.
        """
        mu1, sigma1 = self.calculate_statistics(real) # Shape[512], Shape[512, 512]
        mu2, sigma2 = self.calculate_statistics(fake)
        # print(f"mu1 shape: {mu1.shape}, sigma1 shape: {sigma1.shape}")
        # print(f"mu2 shape: {mu2.shape}, sigma2 shape: {sigma2.shape}")

        # Ensure sigma matrices are at least 2D
        if sigma1.ndim < 2:
            sigma1 = np.expand_dims(sigma1, axis=0)
        if sigma2.ndim < 2:
            sigma2 = np.expand_dims(sigma2, axis=0)

        ssdiff = np.sum((mu1 - mu2) ** 2.0)
        covmean = sqrtm(sigma1 @ sigma2)

        # Check and correct for imaginary numbers
        if np.iscomplexobj(covmean):
            covmean = covmean.real

        return ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)

    def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
        """
        Process a batch of videos and compute FVD.

        Args:
            data_batch (dict): Not used here.
            data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str], Tuple[str]]):
                A list containing four tuples:
                - A tuple of `real_tensor` (torch.Tensor): Real video tensor [T, C, H, W].
                - A tuple of `gen_tensor` (torch.Tensor): Generated video tensor [T, C, H, W].
                - A tuple of `real_video_name` (str): Ground-truth video filename.
                - A tuple of `gen_video_name` (str): Generated video filename.
                The len of each tuples are the batch size.
        """
        results = []
        real_tensor_tuple, gen_tensor_tuple, real_video_name_tuple, gen_video_name_tuple = data_samples

        batch_size = len(real_tensor_tuple)
        with torch.no_grad():
            for i in range(batch_size):
                real_video_name = real_video_name_tuple[i]
                gen_video_name = gen_video_name_tuple[i]
                real_tensor = real_tensor_tuple[i]
                gen_tensor = gen_tensor_tuple[i]

                fvd_score = self.calculate_fvd(real_tensor, gen_tensor)

                results.append({
                    "Real video_name": real_video_name, 
                    "Generated video_name": gen_video_name, 
                    "FVD_Score": fvd_score
                })
                print(f"Processed FVD score {fvd_score:.4f} between {real_video_name} and {gen_video_name}")

        self.results.extend(results)

    def compute_metrics(self, results: list) -> Dict[str, float]:
        """
        Compute the final FVD score.

        Args:
            results (list): List of FVD scores for each batch.

        Returns:
            Dict[str, float]: Dictionary containing mean FVD score.
        """
        scores = np.array([res["FVD_Score"] for res in self.results])
        mean_score = np.mean(scores) if scores.size > 0 else 0.0
        print(f"FVD mean score: {mean_score:.4f}")

        json_file_path = os.path.join(os.getcwd(), "fvd_results.json")
        final_results = {
            "video_results": self.results, 
            "FVD_Mean_Score": mean_score
        }
        with open(json_file_path, "w") as json_file:
            json.dump(final_results, json_file, indent=4)
        print(f"FVD mean score saved to {json_file_path}")

        return {"FVD_Mean_Score": mean_score}

calculate_fvd(real, fake)

Compute FVD score between real and generated videos.

Parameters:

Name Type Description Default
real Tensor

Real video tensor [T, C, H, W].

required
fake Tensor

Generated video tensor [T, C, H, W].

required

Returns:

Name Type Description
float float

FVD score.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
def calculate_fvd(self, real: torch.Tensor, fake: torch.Tensor) -> float:
    """
    Compute FVD score between real and generated videos.

    Args:
        real (torch.Tensor): Real video tensor [T, C, H, W].
        fake (torch.Tensor): Generated video tensor [T, C, H, W].

    Returns:
        float: FVD score.
    """
    mu1, sigma1 = self.calculate_statistics(real) # Shape[512], Shape[512, 512]
    mu2, sigma2 = self.calculate_statistics(fake)
    # print(f"mu1 shape: {mu1.shape}, sigma1 shape: {sigma1.shape}")
    # print(f"mu2 shape: {mu2.shape}, sigma2 shape: {sigma2.shape}")

    # Ensure sigma matrices are at least 2D
    if sigma1.ndim < 2:
        sigma1 = np.expand_dims(sigma1, axis=0)
    if sigma2.ndim < 2:
        sigma2 = np.expand_dims(sigma2, axis=0)

    ssdiff = np.sum((mu1 - mu2) ** 2.0)
    covmean = sqrtm(sigma1 @ sigma2)

    # Check and correct for imaginary numbers
    if np.iscomplexobj(covmean):
        covmean = covmean.real

    return ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)

calculate_statistics(video_tensor)

Extract activation statistics from video frames.

Parameters:

Name Type Description Default
video_tensor Tensor

Video tensor [T, C, H, W].

required

Returns:

Type Description
tuple[ndarray, ndarray]

Tuple[np.ndarray, np.ndarray]: Mean and covariance of extracted features.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
def calculate_statistics(self, video_tensor: torch.Tensor) -> tuple[np.ndarray, np.ndarray]:
    """
    Extract activation statistics from video frames.

    Args:
        video_tensor (torch.Tensor): Video tensor [T, C, H, W].

    Returns:
        Tuple[np.ndarray, np.ndarray]: Mean and covariance of extracted features.
    """
    video_tensor = self.preprocess_tensor(video_tensor).to(self.device)
    self.model.to(self.device)
    # Permute to match I3D input format [B, C, T, H, W]
    video_tensor = video_tensor.permute(1, 0, 2, 3).unsqueeze(0)  # Shape: [1, 3, T, H, W]
    with torch.no_grad():
        features = self.model(video_tensor).cpu().numpy()

    # print('features: ', features.shape)
    mu = features.mean(axis=0)
    # Ensure at least 2 samples to compute covariance
    if features.shape[0] > 1:
        sigma = np.cov(features, rowvar=False)
    else:
        sigma = np.zeros((features.shape[1], features.shape[1])) # Identity fallback
    return mu, sigma

compute_metrics(results)

Compute the final FVD score.

Parameters:

Name Type Description Default
results list

List of FVD scores for each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: Dictionary containing mean FVD score.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """
    Compute the final FVD score.

    Args:
        results (list): List of FVD scores for each batch.

    Returns:
        Dict[str, float]: Dictionary containing mean FVD score.
    """
    scores = np.array([res["FVD_Score"] for res in self.results])
    mean_score = np.mean(scores) if scores.size > 0 else 0.0
    print(f"FVD mean score: {mean_score:.4f}")

    json_file_path = os.path.join(os.getcwd(), "fvd_results.json")
    final_results = {
        "video_results": self.results, 
        "FVD_Mean_Score": mean_score
    }
    with open(json_file_path, "w") as json_file:
        json.dump(final_results, json_file, indent=4)
    print(f"FVD mean score saved to {json_file_path}")

    return {"FVD_Mean_Score": mean_score}

load_i3d_model(model_path, feature_layer)

Load a pre-trained I3D model and modify it to extract features.

Parameters:

Name Type Description Default
model_path str

Path to the I3D model checkpoint.

required
feature_layer int

The layer index from which to extract features.

required

Returns:

Type Description
Module

torch.nn.Module: I3D feature extraction model.

Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
def load_i3d_model(self, model_path: str, feature_layer: int) -> torch.nn.Module:
    """
    Load a pre-trained I3D model and modify it to extract features.

    Args:
        model_path (str): Path to the I3D model checkpoint.
        feature_layer (int): The layer index from which to extract features.

    Returns:
        torch.nn.Module: I3D feature extraction model.
    """
    model = models.video.r3d_18(pretrained=True)  # Using ResNet3D as an I3D alternative
    model.fc = nn.Identity()  # Remove classification head

    if os.path.exists(model_path):
        model.load_state_dict(torch.load(model_path, map_location=self.device))
    else:
        print(f"Warning: Model checkpoint not found at {model_path}, using default weights.")

    return model

preprocess_tensor(video_tensor)

Resize and normalize a video tensor.

Parameters:

Name Type Description Default
video_tensor Tensor

Tensor of shape [T, C, H, W].

required

Returns:

Type Description
Tensor

torch.Tensor: Preprocessed tensor of shape [T, C, H, W].

Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
def preprocess_tensor(self, video_tensor: torch.Tensor) -> torch.Tensor:
    """
    Resize and normalize a video tensor.

    Args:
        video_tensor (torch.Tensor): Tensor of shape [T, C, H, W].

    Returns:
        torch.Tensor: Preprocessed tensor of shape [T, C, H, W].
    """
    return self.transform(video_tensor / 255.0)

process(data_batch, data_samples)

Process a batch of videos and compute FVD.

Parameters:

Name Type Description Default
data_batch dict

Not used here.

required
data_samples List[Tuple[Tensor], Tuple[Tensor], Tuple[str], Tuple[str]]

A list containing four tuples: - A tuple of real_tensor (torch.Tensor): Real video tensor [T, C, H, W]. - A tuple of gen_tensor (torch.Tensor): Generated video tensor [T, C, H, W]. - A tuple of real_video_name (str): Ground-truth video filename. - A tuple of gen_video_name (str): Generated video filename. The len of each tuples are the batch size.

required
Source code in aigve/metrics/video_quality_assessment/distribution_based/fvd/fvd_metric.py
def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
    """
    Process a batch of videos and compute FVD.

    Args:
        data_batch (dict): Not used here.
        data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str], Tuple[str]]):
            A list containing four tuples:
            - A tuple of `real_tensor` (torch.Tensor): Real video tensor [T, C, H, W].
            - A tuple of `gen_tensor` (torch.Tensor): Generated video tensor [T, C, H, W].
            - A tuple of `real_video_name` (str): Ground-truth video filename.
            - A tuple of `gen_video_name` (str): Generated video filename.
            The len of each tuples are the batch size.
    """
    results = []
    real_tensor_tuple, gen_tensor_tuple, real_video_name_tuple, gen_video_name_tuple = data_samples

    batch_size = len(real_tensor_tuple)
    with torch.no_grad():
        for i in range(batch_size):
            real_video_name = real_video_name_tuple[i]
            gen_video_name = gen_video_name_tuple[i]
            real_tensor = real_tensor_tuple[i]
            gen_tensor = gen_tensor_tuple[i]

            fvd_score = self.calculate_fvd(real_tensor, gen_tensor)

            results.append({
                "Real video_name": real_video_name, 
                "Generated video_name": gen_video_name, 
                "FVD_Score": fvd_score
            })
            print(f"Processed FVD score {fvd_score:.4f} between {real_video_name} and {gen_video_name}")

    self.results.extend(results)

GstVqa

Bases: BaseMetric

GstVQA metric modified for the toy dataset. (Supporting 2944-dim features).

Source code in aigve/metrics/video_quality_assessment/nn_based/gstvqa/gstvqa_metric.py
@METRICS.register_module()
class GstVqa(BaseMetric):
    """GstVQA metric modified for the toy dataset. (Supporting 2944-dim features)."""

    def __init__(self, model_path: str):
        super(GstVqa, self).__init__()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.submodel_path = os.path.join(os.getcwd(), 'metrics/video_quality_assessment/nn_based/gstvqa')
        if not submodule_exists(self.submodel_path):
            add_git_submodule(
                repo_url='https://github.com/Baoliang93/GSTVQA.git', 
                submodule_path=self.submodel_path
            )
        from .GSTVQA.TCSVT_Release.GVQA_Release.GVQA_Cross.cross_test import GSTVQA as GSTVQA_model
        self.model = GSTVQA_model().to(self.device)
        self.model.load_state_dict(torch.load(model_path, map_location=self.device))
        self.model.eval()
        # self.criterion = nn.L1Loss().to(self.device)

    def compute_stat_features(self, features: torch.Tensor, num_valid_frames: int) -> Tuple[torch.Tensor]:
        """Compute statistical features mean_var, std_var, mean_mean, std_mean from extracted deep features.

        Args:
            features (torch.Tensor): Tensor of shape [T, 2944].
            num_valid_frames (int): Number of valid frames before padding.

        Returns:
            Tuple[torch.Tensor]: (mean_var, std_var, mean_mean, std_mean), each of shape [1472].
        """
        # Ignore padded frames
        features = features[:num_valid_frames]  # Shape: [num_valid_frames, feature_dim]: [10, 1472]

        if num_valid_frames == 0:  # Edge case: all frames were padded
            return (
                torch.zeros(1472, device=self.device),
                torch.zeros(1472, device=self.device),
                torch.zeros(1472, device=self.device),
                torch.zeros(1472, device=self.device),
            )

        # Split into mean and std components
        mean_features = features[:, :1472]  # First 1472 features are mean-based
        std_features = features[:, 1472:]   # Last 1472 features are std-based

        # Compute per-feature statistics over frames
        mean_mean = mean_features.mean(dim=0)  # Shape: [1472]
        std_mean = std_features.mean(dim=0)    # Shape: [1472]
        mean_var = mean_features.var(dim=0, unbiased=False)  # Shape: [1472]
        std_var = std_features.var(dim=0, unbiased=False)    # Shape: [1472]

        return mean_var, std_var, mean_mean, std_mean

    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """
        Process a batch of extracted deep features for GSTVQA evaluation and store results in a JSON file.

        Args:
            data_batch (SequencTuplee): A batch of data from the dataloader (not used here).
            data_samples (List[ [torch.Tensor], Tuple[int], Tuple[str] ]): 
                A list containing three tuples:
                - A tuple of `deep_features`: Each item is a Tensor of shape [T, 2944]. 
                - A tuple of `num_frames`: Each item is an integer representing the number of valid frames.
                - A tuple of `video_name`: Each item is a string representing the file name for the video.
                The len of each three tuples are the batch size.
        """
        # data_samples an example: [
        #     (tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        #              [0., 0., 0.,  ..., 0., 0., 0.],
        #              ...
        #              [0., 0., 0.,  ..., 0., 0., 0.]]), 
        #      tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        #              [0., 0., 0.,  ..., 0., 0., 0.],
        #              ...
        #              [0., 0., 0.,  ..., 0., 0., 0.]])), 
        #     (10, 10)
        # ]
        results = []
        deep_features_tuple, num_frames_tuple, video_name_tuple = data_samples
        with torch.no_grad():
            for deep_features, num_valid_frames, video_name in zip(deep_features_tuple, num_frames_tuple, video_name_tuple):
                if not isinstance(deep_features, torch.Tensor) or not isinstance(num_valid_frames, int):
                    raise TypeError("Expected deep_features to be a torch.Tensor and num_valid_frames to be an int.")

                if num_valid_frames == 0:  # Edge case: No valid frames
                    results.append({"video_name": 'N/A', "GSTVQA_Score": 0.0})
                    continue

                # Remove padded features
                features = deep_features[:num_valid_frames].to(self.device)

                # Compute statistical features only on valid frames
                mean_var, std_var, mean_mean, std_mean = self.compute_stat_features(features, num_valid_frames)
                mean_var, std_var, mean_mean, std_mean = (
                    mean_var.to(self.device),
                    std_var.to(self.device),
                    mean_mean.to(self.device),
                    std_mean.to(self.device),
                )

                # Length tensor indicating the number of valid frames
                length = torch.tensor([num_valid_frames]).to(self.device)
                # print('features(input) shape', features.unsqueeze(1).shape) # torch.Size([10, 1, 1472])
                # print('input_length shape', length.shape) # torch.Size([1])
                # print('input_length', length) # torch.Size([1])
                # print('mean_mean shape', mean_mean.shape) # torch.Size([1472])
                # print('std_mean shape', std_mean.shape) # torch.Size([1472])
                # print('mean_var shape', mean_var.shape) # torch.Size([1472])
                # print('std_var shape', std_var.shape) # torch.Size([1472])

                # Run GSTVQA model
                outputs = self.model(features.unsqueeze(1), length, mean_var, std_var, mean_mean, std_mean)
                score = outputs.item()
                results.append({"video_name": video_name, "GSTVQA_Score": score})
                # print(f"Processed score {score:.4f} for {video_name}")

        self.results.extend(results)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute final GSTVQA-based metrics."""
        scores = np.array([res['GSTVQA_Score'] for res in self.results])
        mean_score = np.mean(scores)
        print(f"GSTVQA mean score: {mean_score:.4f}")

        json_file_path = os.path.join(os.getcwd(), "gstvqa_results.json")
        final_results = {"video_results": self.results, "GSTVQA_Mean_Score": mean_score}
        with open(json_file_path, "w") as json_file:
            json.dump(final_results, json_file, indent=4)
        print(f"GSTVQA mean score saved to {json_file_path}")

        return {'GSTVQA_Mean_Score': mean_score}

compute_metrics(results)

Compute final GSTVQA-based metrics.

Source code in aigve/metrics/video_quality_assessment/nn_based/gstvqa/gstvqa_metric.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute final GSTVQA-based metrics."""
    scores = np.array([res['GSTVQA_Score'] for res in self.results])
    mean_score = np.mean(scores)
    print(f"GSTVQA mean score: {mean_score:.4f}")

    json_file_path = os.path.join(os.getcwd(), "gstvqa_results.json")
    final_results = {"video_results": self.results, "GSTVQA_Mean_Score": mean_score}
    with open(json_file_path, "w") as json_file:
        json.dump(final_results, json_file, indent=4)
    print(f"GSTVQA mean score saved to {json_file_path}")

    return {'GSTVQA_Mean_Score': mean_score}

compute_stat_features(features, num_valid_frames)

Compute statistical features mean_var, std_var, mean_mean, std_mean from extracted deep features.

Parameters:

Name Type Description Default
features Tensor

Tensor of shape [T, 2944].

required
num_valid_frames int

Number of valid frames before padding.

required

Returns:

Type Description
Tuple[Tensor]

Tuple[torch.Tensor]: (mean_var, std_var, mean_mean, std_mean), each of shape [1472].

Source code in aigve/metrics/video_quality_assessment/nn_based/gstvqa/gstvqa_metric.py
def compute_stat_features(self, features: torch.Tensor, num_valid_frames: int) -> Tuple[torch.Tensor]:
    """Compute statistical features mean_var, std_var, mean_mean, std_mean from extracted deep features.

    Args:
        features (torch.Tensor): Tensor of shape [T, 2944].
        num_valid_frames (int): Number of valid frames before padding.

    Returns:
        Tuple[torch.Tensor]: (mean_var, std_var, mean_mean, std_mean), each of shape [1472].
    """
    # Ignore padded frames
    features = features[:num_valid_frames]  # Shape: [num_valid_frames, feature_dim]: [10, 1472]

    if num_valid_frames == 0:  # Edge case: all frames were padded
        return (
            torch.zeros(1472, device=self.device),
            torch.zeros(1472, device=self.device),
            torch.zeros(1472, device=self.device),
            torch.zeros(1472, device=self.device),
        )

    # Split into mean and std components
    mean_features = features[:, :1472]  # First 1472 features are mean-based
    std_features = features[:, 1472:]   # Last 1472 features are std-based

    # Compute per-feature statistics over frames
    mean_mean = mean_features.mean(dim=0)  # Shape: [1472]
    std_mean = std_features.mean(dim=0)    # Shape: [1472]
    mean_var = mean_features.var(dim=0, unbiased=False)  # Shape: [1472]
    std_var = std_features.var(dim=0, unbiased=False)    # Shape: [1472]

    return mean_var, std_var, mean_mean, std_mean

process(data_batch, data_samples)

Process a batch of extracted deep features for GSTVQA evaluation and store results in a JSON file.

Parameters:

Name Type Description Default
data_batch SequencTuplee

A batch of data from the dataloader (not used here).

required
data_samples List[[Tensor], Tuple[int], Tuple[str]]

A list containing three tuples: - A tuple of deep_features: Each item is a Tensor of shape [T, 2944]. - A tuple of num_frames: Each item is an integer representing the number of valid frames. - A tuple of video_name: Each item is a string representing the file name for the video. The len of each three tuples are the batch size.

required
Source code in aigve/metrics/video_quality_assessment/nn_based/gstvqa/gstvqa_metric.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """
    Process a batch of extracted deep features for GSTVQA evaluation and store results in a JSON file.

    Args:
        data_batch (SequencTuplee): A batch of data from the dataloader (not used here).
        data_samples (List[ [torch.Tensor], Tuple[int], Tuple[str] ]): 
            A list containing three tuples:
            - A tuple of `deep_features`: Each item is a Tensor of shape [T, 2944]. 
            - A tuple of `num_frames`: Each item is an integer representing the number of valid frames.
            - A tuple of `video_name`: Each item is a string representing the file name for the video.
            The len of each three tuples are the batch size.
    """
    # data_samples an example: [
    #     (tensor([[0., 0., 0.,  ..., 0., 0., 0.],
    #              [0., 0., 0.,  ..., 0., 0., 0.],
    #              ...
    #              [0., 0., 0.,  ..., 0., 0., 0.]]), 
    #      tensor([[0., 0., 0.,  ..., 0., 0., 0.],
    #              [0., 0., 0.,  ..., 0., 0., 0.],
    #              ...
    #              [0., 0., 0.,  ..., 0., 0., 0.]])), 
    #     (10, 10)
    # ]
    results = []
    deep_features_tuple, num_frames_tuple, video_name_tuple = data_samples
    with torch.no_grad():
        for deep_features, num_valid_frames, video_name in zip(deep_features_tuple, num_frames_tuple, video_name_tuple):
            if not isinstance(deep_features, torch.Tensor) or not isinstance(num_valid_frames, int):
                raise TypeError("Expected deep_features to be a torch.Tensor and num_valid_frames to be an int.")

            if num_valid_frames == 0:  # Edge case: No valid frames
                results.append({"video_name": 'N/A', "GSTVQA_Score": 0.0})
                continue

            # Remove padded features
            features = deep_features[:num_valid_frames].to(self.device)

            # Compute statistical features only on valid frames
            mean_var, std_var, mean_mean, std_mean = self.compute_stat_features(features, num_valid_frames)
            mean_var, std_var, mean_mean, std_mean = (
                mean_var.to(self.device),
                std_var.to(self.device),
                mean_mean.to(self.device),
                std_mean.to(self.device),
            )

            # Length tensor indicating the number of valid frames
            length = torch.tensor([num_valid_frames]).to(self.device)
            # print('features(input) shape', features.unsqueeze(1).shape) # torch.Size([10, 1, 1472])
            # print('input_length shape', length.shape) # torch.Size([1])
            # print('input_length', length) # torch.Size([1])
            # print('mean_mean shape', mean_mean.shape) # torch.Size([1472])
            # print('std_mean shape', std_mean.shape) # torch.Size([1472])
            # print('mean_var shape', mean_var.shape) # torch.Size([1472])
            # print('std_var shape', std_var.shape) # torch.Size([1472])

            # Run GSTVQA model
            outputs = self.model(features.unsqueeze(1), length, mean_var, std_var, mean_mean, std_mean)
            score = outputs.item()
            results.append({"video_name": video_name, "GSTVQA_Score": score})
            # print(f"Processed score {score:.4f} for {video_name}")

    self.results.extend(results)

ISScore

Bases: BaseMetric

Inception Score (IS) implementation.

The Inception Score measures the quality and diversity of generated images by evaluating the KL divergence between the conditional class distribution and the marginal class distribution.

Parameters:

Name Type Description Default
model_name str

Name of the model to use. Currently only 'inception_v3' is supported.

'inception_v3'
input_shape tuple

Input shape for the model (height, width, channels).

(299, 299, 3)
splits int

Number of splits to use when calculating the score.

10
is_gpu bool

Whether to use GPU. Defaults to True.

True
Source code in aigve/metrics/video_quality_assessment/distribution_based/is_score_metric.py
@METRICS.register_module()
class ISScore(BaseMetric):
    """
    Inception Score (IS) implementation.

    The Inception Score measures the quality and diversity of generated images
    by evaluating the KL divergence between the conditional class distribution
    and the marginal class distribution.

    Args:
        model_name (str): Name of the model to use. Currently only 'inception_v3' is supported.
        input_shape (tuple): Input shape for the model (height, width, channels).
        splits (int): Number of splits to use when calculating the score.
        is_gpu (bool): Whether to use GPU. Defaults to True.
    """

    def __init__(
            self, 
            model_name: str = 'inception_v3', 
            input_shape: tuple = (299, 299, 3), 
            splits: int = 10,
            is_gpu: bool = True):
        super(ISScore, self).__init__()
        self.device = torch.device("cuda" if is_gpu and torch.cuda.is_available() else "cpu")
        self.splits = splits

        if model_name == 'inception_v3':
            self.model = models.inception_v3(pretrained=True, transform_input=False, aux_logits=True)
            self.model.eval().to(self.device)
        else:
            raise ValueError(f"Model '{model_name}' is not supported for Inception Score computation.")

    def preprocess_tensor(self, images: torch.Tensor) -> torch.Tensor:
        """
        Resize and normalize images.

        Args:
            images (torch.Tensor): Tensor of shape [B, C, H, W].

        Returns:
            torch.Tensor: Preprocessed images.
        """
        images = nn.functional.interpolate(images, size=(299, 299), mode='bilinear', align_corners=False)
        mean = torch.tensor([0.485, 0.456, 0.406], device=images.device).view(1, -1, 1, 1)
        std = torch.tensor([0.229, 0.224, 0.225], device=images.device).view(1, -1, 1, 1)
        images = (images - mean) / std
        return images

    def compute_inception_features(self, images: torch.Tensor) -> torch.Tensor:
        """
        Compute Inception features for a batch of images.

        Args:
            images (torch.Tensor): Preprocessed image tensor.

        Returns:
            torch.Tensor: Feature activations from InceptionV3.
        """
        images = self.preprocess_tensor(images).to(self.device)
        with torch.no_grad():
            output = self.model(images)
            if isinstance(output, tuple):
                output = output[0]
        return output.cpu()

    def calculate_is(self, preds: np.ndarray) -> float:
        """
        Calculate the Inception Score (IS) for a set of predicted class probabilities.

        Args:
            preds (np.ndarray): Array of predicted softmax probabilities with shape [N, num_classes].

        Returns:
            (float): Inception Score.
        """
        kl = preds * (np.log(preds + 1e-10) - np.log(np.expand_dims(np.mean(preds, axis=0), 0) + 1e-10))
        kl_mean = np.mean(np.sum(kl, axis=1))
        return float(np.exp(kl_mean))

    def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
        """
        Process one batch of data samples and compute IS.

        Args:
            data_batch (dict): A batch of data from the dataloader (not used here).
            data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str], Tuple[str]]):
                A list containing four tuples:
                - A tuple of `real_tensor` (torch.Tensor): Real video tensor [T, C, H, W].
                - A tuple of `gen_tensor` (torch.Tensor): Generated video tensor [T, C, H, W].
                - A tuple of `real_video_name` (str): Ground-truth video filename.
                - A tuple of `gen_video_name` (str): Generated video filename.
                The len of each tuples are the batch size.
        """
        results = []
        real_tensor_tuple, gen_tensor_tuple, real_video_name_tuple, gen_video_name_tuple = data_samples

        batch_size = len(gen_tensor_tuple)
        with torch.no_grad():
            for i in range(batch_size):
                gen_video_name = gen_video_name_tuple[i]
                gen_tensor = gen_tensor_tuple[i]

                logits = self.compute_inception_features(gen_tensor)
                preds = torch.nn.functional.softmax(logits, dim=1).numpy()
                is_score = self.calculate_is(preds)

                results.append({
                    "Generated video_name": gen_video_name, 
                    "IS_Score": is_score,
                })
                print(f"Processed IS score {is_score:.4f} for {gen_video_name}")

        self.results.extend(results)

    def compute_metrics(self, results: list) -> Dict[str, float]:
        """
        Compute the final IS score.

        Args:
            results (list): List of IS scores for each batch.

        Returns:
            Dict[str, float]: Dictionary containing mean IS score and standard deviation.
        """
        scores = np.array([res["IS_Score"] for res in self.results])

        mean_score = np.mean(scores) if scores.size > 0 else 0.0

        print(f"IS mean score: {mean_score:.4f}")

        json_file_path = os.path.join(os.getcwd(), "is_results.json")
        final_results = {
            "video_results": self.results, 
            "IS_Mean_Score": mean_score, 
        }
        with open(json_file_path, "w") as json_file:
            json.dump(final_results, json_file, indent=4)
        print(f"IS mean score saved to {json_file_path}")

        return {"IS_Mean_Score": mean_score}

calculate_is(preds)

Calculate the Inception Score (IS) for a set of predicted class probabilities.

Parameters:

Name Type Description Default
preds ndarray

Array of predicted softmax probabilities with shape [N, num_classes].

required

Returns:

Type Description
float

Inception Score.

Source code in aigve/metrics/video_quality_assessment/distribution_based/is_score_metric.py
def calculate_is(self, preds: np.ndarray) -> float:
    """
    Calculate the Inception Score (IS) for a set of predicted class probabilities.

    Args:
        preds (np.ndarray): Array of predicted softmax probabilities with shape [N, num_classes].

    Returns:
        (float): Inception Score.
    """
    kl = preds * (np.log(preds + 1e-10) - np.log(np.expand_dims(np.mean(preds, axis=0), 0) + 1e-10))
    kl_mean = np.mean(np.sum(kl, axis=1))
    return float(np.exp(kl_mean))

compute_inception_features(images)

Compute Inception features for a batch of images.

Parameters:

Name Type Description Default
images Tensor

Preprocessed image tensor.

required

Returns:

Type Description
Tensor

torch.Tensor: Feature activations from InceptionV3.

Source code in aigve/metrics/video_quality_assessment/distribution_based/is_score_metric.py
def compute_inception_features(self, images: torch.Tensor) -> torch.Tensor:
    """
    Compute Inception features for a batch of images.

    Args:
        images (torch.Tensor): Preprocessed image tensor.

    Returns:
        torch.Tensor: Feature activations from InceptionV3.
    """
    images = self.preprocess_tensor(images).to(self.device)
    with torch.no_grad():
        output = self.model(images)
        if isinstance(output, tuple):
            output = output[0]
    return output.cpu()

compute_metrics(results)

Compute the final IS score.

Parameters:

Name Type Description Default
results list

List of IS scores for each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: Dictionary containing mean IS score and standard deviation.

Source code in aigve/metrics/video_quality_assessment/distribution_based/is_score_metric.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """
    Compute the final IS score.

    Args:
        results (list): List of IS scores for each batch.

    Returns:
        Dict[str, float]: Dictionary containing mean IS score and standard deviation.
    """
    scores = np.array([res["IS_Score"] for res in self.results])

    mean_score = np.mean(scores) if scores.size > 0 else 0.0

    print(f"IS mean score: {mean_score:.4f}")

    json_file_path = os.path.join(os.getcwd(), "is_results.json")
    final_results = {
        "video_results": self.results, 
        "IS_Mean_Score": mean_score, 
    }
    with open(json_file_path, "w") as json_file:
        json.dump(final_results, json_file, indent=4)
    print(f"IS mean score saved to {json_file_path}")

    return {"IS_Mean_Score": mean_score}

preprocess_tensor(images)

Resize and normalize images.

Parameters:

Name Type Description Default
images Tensor

Tensor of shape [B, C, H, W].

required

Returns:

Type Description
Tensor

torch.Tensor: Preprocessed images.

Source code in aigve/metrics/video_quality_assessment/distribution_based/is_score_metric.py
def preprocess_tensor(self, images: torch.Tensor) -> torch.Tensor:
    """
    Resize and normalize images.

    Args:
        images (torch.Tensor): Tensor of shape [B, C, H, W].

    Returns:
        torch.Tensor: Preprocessed images.
    """
    images = nn.functional.interpolate(images, size=(299, 299), mode='bilinear', align_corners=False)
    mean = torch.tensor([0.485, 0.456, 0.406], device=images.device).view(1, -1, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225], device=images.device).view(1, -1, 1, 1)
    images = (images - mean) / std
    return images

process(data_batch, data_samples)

Process one batch of data samples and compute IS.

Parameters:

Name Type Description Default
data_batch dict

A batch of data from the dataloader (not used here).

required
data_samples List[Tuple[Tensor], Tuple[Tensor], Tuple[str], Tuple[str]]

A list containing four tuples: - A tuple of real_tensor (torch.Tensor): Real video tensor [T, C, H, W]. - A tuple of gen_tensor (torch.Tensor): Generated video tensor [T, C, H, W]. - A tuple of real_video_name (str): Ground-truth video filename. - A tuple of gen_video_name (str): Generated video filename. The len of each tuples are the batch size.

required
Source code in aigve/metrics/video_quality_assessment/distribution_based/is_score_metric.py
def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
    """
    Process one batch of data samples and compute IS.

    Args:
        data_batch (dict): A batch of data from the dataloader (not used here).
        data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str], Tuple[str]]):
            A list containing four tuples:
            - A tuple of `real_tensor` (torch.Tensor): Real video tensor [T, C, H, W].
            - A tuple of `gen_tensor` (torch.Tensor): Generated video tensor [T, C, H, W].
            - A tuple of `real_video_name` (str): Ground-truth video filename.
            - A tuple of `gen_video_name` (str): Generated video filename.
            The len of each tuples are the batch size.
    """
    results = []
    real_tensor_tuple, gen_tensor_tuple, real_video_name_tuple, gen_video_name_tuple = data_samples

    batch_size = len(gen_tensor_tuple)
    with torch.no_grad():
        for i in range(batch_size):
            gen_video_name = gen_video_name_tuple[i]
            gen_tensor = gen_tensor_tuple[i]

            logits = self.compute_inception_features(gen_tensor)
            preds = torch.nn.functional.softmax(logits, dim=1).numpy()
            is_score = self.calculate_is(preds)

            results.append({
                "Generated video_name": gen_video_name, 
                "IS_Score": is_score,
            })
            print(f"Processed IS score {is_score:.4f} for {gen_video_name}")

    self.results.extend(results)

LightVQAPlus

Bases: BaseMetric

LightVQA+ metric for evaluating video quality.

Source code in aigve/metrics/video_quality_assessment/nn_based/lightvqa_plus/lightvqa_plus_metric.py
@METRICS.register_module()
class LightVQAPlus(BaseMetric):
    """LightVQA+ metric for evaluating video quality."""

    def __init__(self, model_path: str, swin_weights: str, is_gpu: bool = True):
        super(LightVQAPlus, self).__init__()
        self.model_path = model_path
        self.swin_weights = swin_weights
        self.device = torch.device("cuda" if is_gpu else "cpu")

        self.submodel_path = os.path.join(os.getcwd(), 'metrics/video_quality_assessment/nn_based/lightvqa_plus')
        if not submodule_exists(self.submodel_path):
            add_git_submodule(
                repo_url='https://github.com/SaMMyCHoo/Light-VQA-plus.git', 
                submodule_path=self.submodel_path
            )
        lightvqa_path = os.path.join(self.submodel_path, "Light_VQA_plus")
        if lightvqa_path not in sys.path:
            sys.path.insert(0, lightvqa_path)

        from .Light_VQA_plus.final_fusion_model import swin_small_patch4_window7_224 as create_model
        self.model = create_model().to(self.device)

        weights_dict = torch.load(os.path.join(os.getcwd(), self.model_path), map_location=self.device)
        print(self.model.load_state_dict(weights_dict))

        self.model.eval()

    def process(self, data_batch: list, data_samples: list) -> None:
        """
        Process a batch of extracted deep features for LightVQA+ evaluation.
        Args:
            data_batch (Sequence): A batch of data from the dataloader (not used here).
            data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str]]):
                A list containing five tuples:
                - spatial_features (torch.Tensor): Extracts 8 evenly spaced key frames. Shape: [8, 3, 672, 1120].
                - temporal_features (torch.Tensor): Motion features from SlowFast. Shape: [1, feature_dim(2304)].
                - bns_features (torch.Tensor): Brightness & Noise features. Shape: [8, 300].
                - bc_features (torch.Tensor): Temporal brightness contrast features. Shape: [8, final_dim(20)].
                - video_name (str): Video filename.
                The len of each tuples are the batch size.
        """
        results = []
        spatial_features_tuple, temporal_features_tuple, bns_features_tuple, bc_features_tuple, video_name_tuple = data_samples
        # print('spatial_features_tuple len: ', len(spatial_features_tuple)) # B
        # print('spatial_features_tuple[0]: ', spatial_features_tuple[0].shape) # torch.Size([8, 3, 672, 1120])
        # print('temporal_features_tuple[0]: ', temporal_features_tuple[0].shape) # torch.Size([1, 2304])
        # print('bns_features_tuple[0]: ', bns_features_tuple[0].shape) # torch.Size([8, 300])
        # print('bc_features_tuple[0]: ', bc_features_tuple[0].shape) # torch.Size([8, 20])

        batch_size = len(spatial_features_tuple)
        with torch.no_grad():
            for i in range(batch_size):
                video_name = video_name_tuple[i]
                spatial_features = spatial_features_tuple[i].to(self.device) # torch.Size([8, 3, 672, 1120])
                temporal_features = temporal_features_tuple[i].to(self.device) # torch.Size([1, 2304])
                bns_features = bns_features_tuple[i].to(self.device) # torch.Size([8, 300])
                bc_features = bc_features_tuple[i].to(self.device)  # Shape: [8, final_dim(20)]

                concat_features = torch.cat([temporal_features, bc_features.view(1, -1)], dim=1) # torch.Size([1, 2304+8*20])
                # print('concat_features: ', concat_features.shape) # torch.Size([1, 2464])
                final_temporal_features = F.pad(concat_features, (0, 2604 - concat_features.shape[1]), mode="constant", value=0) # torch.Size([1, 2604])
                # print('final_temporal_features: ', final_temporal_features.shape) # torch.Size([1, 2604])

                outputs = self.model(spatial_features, final_temporal_features, bns_features)
                # print('outputs: ', outputs)
                score = outputs.mean().item()

                results.append({"video_name": video_name, "LightVQAPlus_Score": score})
                print(f"Processed score {score:.4f} for {video_name}")

        self.results.extend(results)

    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute final LightVQA+ metrics."""
        scores = np.array([res["LightVQAPlus_Score"] for res in self.results])
        mean_score = np.mean(scores) if scores.size > 0 else 0.0
        print(f"LightVQA+ mean score: {mean_score:.4f}")

        json_file_path = os.path.join(os.getcwd(), "lightvqaplus_results.json")
        final_results = {"video_results": self.results, "LightVQAPlus_Mean_Score": mean_score}
        with open(json_file_path, "w") as json_file:
            json.dump(final_results, json_file, indent=4)
        print(f"LightVQA+ mean score saved to {json_file_path}")

        return {"LightVQAPlus_Mean_Score": mean_score}

compute_metrics(results)

Compute final LightVQA+ metrics.

Source code in aigve/metrics/video_quality_assessment/nn_based/lightvqa_plus/lightvqa_plus_metric.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute final LightVQA+ metrics."""
    scores = np.array([res["LightVQAPlus_Score"] for res in self.results])
    mean_score = np.mean(scores) if scores.size > 0 else 0.0
    print(f"LightVQA+ mean score: {mean_score:.4f}")

    json_file_path = os.path.join(os.getcwd(), "lightvqaplus_results.json")
    final_results = {"video_results": self.results, "LightVQAPlus_Mean_Score": mean_score}
    with open(json_file_path, "w") as json_file:
        json.dump(final_results, json_file, indent=4)
    print(f"LightVQA+ mean score saved to {json_file_path}")

    return {"LightVQAPlus_Mean_Score": mean_score}

process(data_batch, data_samples)

Process a batch of extracted deep features for LightVQA+ evaluation. Args: data_batch (Sequence): A batch of data from the dataloader (not used here). data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str]]): A list containing five tuples: - spatial_features (torch.Tensor): Extracts 8 evenly spaced key frames. Shape: [8, 3, 672, 1120]. - temporal_features (torch.Tensor): Motion features from SlowFast. Shape: [1, feature_dim(2304)]. - bns_features (torch.Tensor): Brightness & Noise features. Shape: [8, 300]. - bc_features (torch.Tensor): Temporal brightness contrast features. Shape: [8, final_dim(20)]. - video_name (str): Video filename. The len of each tuples are the batch size.

Source code in aigve/metrics/video_quality_assessment/nn_based/lightvqa_plus/lightvqa_plus_metric.py
def process(self, data_batch: list, data_samples: list) -> None:
    """
    Process a batch of extracted deep features for LightVQA+ evaluation.
    Args:
        data_batch (Sequence): A batch of data from the dataloader (not used here).
        data_samples (List[Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[torch.Tensor], Tuple[str]]):
            A list containing five tuples:
            - spatial_features (torch.Tensor): Extracts 8 evenly spaced key frames. Shape: [8, 3, 672, 1120].
            - temporal_features (torch.Tensor): Motion features from SlowFast. Shape: [1, feature_dim(2304)].
            - bns_features (torch.Tensor): Brightness & Noise features. Shape: [8, 300].
            - bc_features (torch.Tensor): Temporal brightness contrast features. Shape: [8, final_dim(20)].
            - video_name (str): Video filename.
            The len of each tuples are the batch size.
    """
    results = []
    spatial_features_tuple, temporal_features_tuple, bns_features_tuple, bc_features_tuple, video_name_tuple = data_samples
    # print('spatial_features_tuple len: ', len(spatial_features_tuple)) # B
    # print('spatial_features_tuple[0]: ', spatial_features_tuple[0].shape) # torch.Size([8, 3, 672, 1120])
    # print('temporal_features_tuple[0]: ', temporal_features_tuple[0].shape) # torch.Size([1, 2304])
    # print('bns_features_tuple[0]: ', bns_features_tuple[0].shape) # torch.Size([8, 300])
    # print('bc_features_tuple[0]: ', bc_features_tuple[0].shape) # torch.Size([8, 20])

    batch_size = len(spatial_features_tuple)
    with torch.no_grad():
        for i in range(batch_size):
            video_name = video_name_tuple[i]
            spatial_features = spatial_features_tuple[i].to(self.device) # torch.Size([8, 3, 672, 1120])
            temporal_features = temporal_features_tuple[i].to(self.device) # torch.Size([1, 2304])
            bns_features = bns_features_tuple[i].to(self.device) # torch.Size([8, 300])
            bc_features = bc_features_tuple[i].to(self.device)  # Shape: [8, final_dim(20)]

            concat_features = torch.cat([temporal_features, bc_features.view(1, -1)], dim=1) # torch.Size([1, 2304+8*20])
            # print('concat_features: ', concat_features.shape) # torch.Size([1, 2464])
            final_temporal_features = F.pad(concat_features, (0, 2604 - concat_features.shape[1]), mode="constant", value=0) # torch.Size([1, 2604])
            # print('final_temporal_features: ', final_temporal_features.shape) # torch.Size([1, 2604])

            outputs = self.model(spatial_features, final_temporal_features, bns_features)
            # print('outputs: ', outputs)
            score = outputs.mean().item()

            results.append({"video_name": video_name, "LightVQAPlus_Score": score})
            print(f"Processed score {score:.4f} for {video_name}")

    self.results.extend(results)

PickScore

Bases: BaseMetric

Initialize the PickScore evaluator.

Parameters:

Name Type Description Default
model_name str

The name of the PickScore model. Defaults to yuvalkirstain/PickScore_v1.

'yuvalkirstain/PickScore_v1'
logit_scale bool

Whether to calcualte the cosine similarity as logits. Defaults to False.

False
Source code in aigve/metrics/text_video_alignment/similarity_based/pickscore/pick_infer.py
@METRICS.register_module()
class PickScore(BaseMetric):
    """ Initialize the ``PickScore`` evaluator.

    Args:
        model_name (str): The name of the PickScore model. Defaults to ``yuvalkirstain/PickScore_v1``.
        logit_scale (bool): Whether to calcualte the cosine similarity as logits. Defaults to False.
    """
    def __init__(self, 
                 model_name: str = "yuvalkirstain/PickScore_v1", 
                 logit_scale: bool = False) -> None:
        super().__init__()
        self.model_name = model_name
        self.logit_scale = logit_scale

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model =AutoModel.from_pretrained(self.model_name).eval().to(self.device)
        self.model.eval()


    # def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """PickScore process
        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_prompts, input_videos = data_samples
        bsz = len(input_prompts)

        # Ensure prompt_input is a tensor
        if isinstance(input_prompts, tuple):
            input_prompts = list(input_prompts)

        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)

        pickscore_sum, pickscore_cnt = 0, 0
        logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
        with torch.no_grad():
            for input_prompt, input_frames in zip(input_prompts, input_videos):

                input_prompt = input_prompt.to(self.device)
                text_feature = self.model.get_text_features(input_prompt)
                text_feature = text_feature / torch.norm(text_feature, dim=-1, keepdim=True)

                input_frames = input_frames.to(self.device)  # Add batch dimension and move the frame to the device
                frame_features = self.model.get_image_features(input_frames)
                frame_features = frame_features / torch.norm(frame_features, dim=-1, keepdim=True)

                pick_score = logit_scale *  (frame_features @ text_feature.T).mean().item()
                print('current pickscore', pick_score)
                pickscore_sum += pick_score
                pickscore_cnt += 1

        # get probabilities if you have multiple images to choose from
        # probs = torch.softmax(scores, dim=-1)
        pickscore_total_avg = pickscore_sum/pickscore_cnt
        result['pick_score'] = pickscore_total_avg

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        pickscore_np = np.zeros(len(results))
        for i, result in enumerate(results):
            pickscore_np[i] = result['pick_score']

        pickscore_sim_mean = np.mean(pickscore_np) 

        print("Test results: PickScore={:.4f}"
              .format(pickscore_sim_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/similarity_based/pickscore/pick_infer.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    pickscore_np = np.zeros(len(results))
    for i, result in enumerate(results):
        pickscore_np[i] = result['pick_score']

    pickscore_sim_mean = np.mean(pickscore_np) 

    print("Test results: PickScore={:.4f}"
          .format(pickscore_sim_mean))

    return result

process(data_batch, data_samples)

PickScore process Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/similarity_based/pickscore/pick_infer.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """PickScore process
    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_prompts, input_videos = data_samples
    bsz = len(input_prompts)

    # Ensure prompt_input is a tensor
    if isinstance(input_prompts, tuple):
        input_prompts = list(input_prompts)

    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)

    pickscore_sum, pickscore_cnt = 0, 0
    logit_scale = self.model.logit_scale.exp() if self.logit_scale else 1
    with torch.no_grad():
        for input_prompt, input_frames in zip(input_prompts, input_videos):

            input_prompt = input_prompt.to(self.device)
            text_feature = self.model.get_text_features(input_prompt)
            text_feature = text_feature / torch.norm(text_feature, dim=-1, keepdim=True)

            input_frames = input_frames.to(self.device)  # Add batch dimension and move the frame to the device
            frame_features = self.model.get_image_features(input_frames)
            frame_features = frame_features / torch.norm(frame_features, dim=-1, keepdim=True)

            pick_score = logit_scale *  (frame_features @ text_feature.T).mean().item()
            print('current pickscore', pick_score)
            pickscore_sum += pick_score
            pickscore_cnt += 1

    # get probabilities if you have multiple images to choose from
    # probs = torch.softmax(scores, dim=-1)
    pickscore_total_avg = pickscore_sum/pickscore_cnt
    result['pick_score'] = pickscore_total_avg

    self.results.append(result)

SimpleVqa

Bases: BaseMetric

SimpleVQA metric for evaluating video quality.

Source code in aigve/metrics/video_quality_assessment/nn_based/simplevqa/simplevqa_metric.py
@METRICS.register_module()
class SimpleVqa(BaseMetric):
    """SimpleVQA metric for evaluating video quality."""
    def __init__(self, model_path: str, is_gpu: bool = True):
        super(SimpleVqa, self).__init__()
        self.model_path = model_path
        self.device = torch.device("cuda" if is_gpu else "cpu")
        self.submodel_path = os.path.join(os.getcwd(), 'metrics/video_quality_assessment/nn_based/simplevqa')
        if not submodule_exists(self.submodel_path):
            add_git_submodule(
                repo_url='https://github.com/sunwei925/SimpleVQA.git', 
                submodule_path=self.submodel_path
            )
        simplevqa_path = os.path.join(self.submodel_path, "SimpleVQA")
        if simplevqa_path not in sys.path:
            sys.path.insert(0, simplevqa_path)
        from .SimpleVQA.model import UGC_BVQA_model
        from .SimpleVQA.test_demo import slowfast
        self.model_motion = slowfast().to(self.device)
        self.model = UGC_BVQA_model.resnet50(pretrained=False)
        self.model = torch.nn.DataParallel(self.model).to(self.device)
        self.model.load_state_dict(torch.load(os.path.join(os.getcwd(), self.model_path), map_location=self.device))
        self.model.eval()

    def process(self, data_batch: list, data_samples: list) -> None:
        """
        Process a batch of extracted deep features for SimpleVQA evaluation.
        Args:
            data_batch (Sequence): A batch of data from the dataloader (not used here).
            data_samples (List[ Tuple[torch.Tensor], List[Tuple[torch.Tensor]], Tuple[str] ]):
                A list containing three tuples:
                - A tuple of `spatial_features` (torch.Tensor): Shape [v_len_second, 3, 448, 448]. 
                    `v_len_second` is total seconds of the video (though 2 for toy dataset) with minium 8 (i.e. min_video_seconds). 
                    The len of the tuple is the batch size. 
                - A list of `motion_features` (Tuple[torch.Tensor]): 
                    len(List) is total seconds of the video, with minium 8 (i.e. min_video_seconds).
                    Each item of the list is a Tuple of motion feature tensors. Each has shape [32, 3, 224, 224].
                    The len of the tuple is the batch size.
                - A tuple of `video_name` (str): Video filename. The len of the tuple is the batch size.
        """
        from .SimpleVQA.test_demo import pack_pathway_output

        results = []
        # print(type(data_samples)) # list
        spatial_features_tuple, motion_features_list, video_name_tuple = data_samples
        # print(len(spatial_features_tuple)) # 1
        # print(spatial_features_tuple[0].shape) # torch.Size([8, 3, 448, 448])

        # print(type(motion_features_list)) # List
        # print(len(motion_features_list)) # 8
        # print(type(motion_features_list[0])) # tuple
        # print(len(motion_features_list[0])) # 1
        # print(type(motion_features_list[0][0])) # Tensor
        # print(motion_features_list[0][0].shape) # torch.Size([32, 3, 224, 224])

        batch_size = len(spatial_features_tuple)
        with torch.no_grad():
            for i in range(batch_size):
                video_name = video_name_tuple[i]
                spatial_features = spatial_features_tuple[i].to(self.device).unsqueeze(0)  # Add batch dim. Shape: tensor with Size([1, v_len_second, 3, 448, 448])

                # Take the i-th element from each tuple in motion_features_list
                motion_features = [motion_features_list[j][i] for j in range(len(motion_features_list))] # Shape: List[tensor with Size([32, 3, 224, 224])], len of it is total seconds of the video, with minium 8.

                if not all(isinstance(mf, torch.Tensor) for mf in motion_features):
                    raise TypeError("Expected motion_features to be a list of tensors.")

                if len(motion_features) == 0:  # Edge case: No valid motion features
                    results.append({"video_name": video_name, "SimpleVQA_Score": 0.0})
                    continue

                n_clip = len(motion_features)  # 8
                feature_motion = torch.zeros([n_clip, 2048 + 256], device=self.device) 
                # Process each motion clip
                for idx, clip in enumerate(motion_features):
                    clip = clip.unsqueeze(dim=0).permute(0, 2, 1, 3, 4)  # Reshape to [1, C(3), T(32), H(224), W(224)]
                    clip = pack_pathway_output(clip, self.device)  # Convert to SlowFast format
                    slow_feature, fast_feature = self.model_motion(clip)
                    slow_feature = slow_feature.squeeze()
                    fast_feature = fast_feature.squeeze()

                    motion_feature = torch.cat([slow_feature, fast_feature]).unsqueeze(0)  # Shape: [1, 2304]
                    feature_motion[idx] = motion_feature 

                feature_motion = feature_motion.unsqueeze(0)  # Shape: [1, n_clip, 2304]

                outputs = self.model(spatial_features, feature_motion)
                score = outputs.item()

                results.append({"video_name": video_name, "SimpleVQA_Score": score})
                print(f"Processed score {score:.4f} for {video_name}")

        self.results.extend(results)

    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute final SimpleVQA-based metrics."""
        scores = np.array([res["SimpleVQA_Score"] for res in self.results])
        mean_score = np.mean(scores) if scores.size > 0 else 0.0
        print(f"SimpleVQA mean score: {mean_score:.4f}")

        json_file_path = os.path.join(os.getcwd(), "simplevqa_results.json")
        final_results = {"video_results": self.results, "SimpleVQA_Mean_Score": mean_score}
        with open(json_file_path, "w") as json_file:
            json.dump(final_results, json_file, indent=4)
        print(f"SimpleVQA mean score saved to {json_file_path}")

        return {"SimpleVQA_Mean_Score": mean_score}

compute_metrics(results)

Compute final SimpleVQA-based metrics.

Source code in aigve/metrics/video_quality_assessment/nn_based/simplevqa/simplevqa_metric.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute final SimpleVQA-based metrics."""
    scores = np.array([res["SimpleVQA_Score"] for res in self.results])
    mean_score = np.mean(scores) if scores.size > 0 else 0.0
    print(f"SimpleVQA mean score: {mean_score:.4f}")

    json_file_path = os.path.join(os.getcwd(), "simplevqa_results.json")
    final_results = {"video_results": self.results, "SimpleVQA_Mean_Score": mean_score}
    with open(json_file_path, "w") as json_file:
        json.dump(final_results, json_file, indent=4)
    print(f"SimpleVQA mean score saved to {json_file_path}")

    return {"SimpleVQA_Mean_Score": mean_score}

process(data_batch, data_samples)

Process a batch of extracted deep features for SimpleVQA evaluation. Args: data_batch (Sequence): A batch of data from the dataloader (not used here). data_samples (List[ Tuple[torch.Tensor], List[Tuple[torch.Tensor]], Tuple[str] ]): A list containing three tuples: - A tuple of spatial_features (torch.Tensor): Shape [v_len_second, 3, 448, 448]. v_len_second is total seconds of the video (though 2 for toy dataset) with minium 8 (i.e. min_video_seconds). The len of the tuple is the batch size. - A list of motion_features (Tuple[torch.Tensor]): len(List) is total seconds of the video, with minium 8 (i.e. min_video_seconds). Each item of the list is a Tuple of motion feature tensors. Each has shape [32, 3, 224, 224]. The len of the tuple is the batch size. - A tuple of video_name (str): Video filename. The len of the tuple is the batch size.

Source code in aigve/metrics/video_quality_assessment/nn_based/simplevqa/simplevqa_metric.py
def process(self, data_batch: list, data_samples: list) -> None:
    """
    Process a batch of extracted deep features for SimpleVQA evaluation.
    Args:
        data_batch (Sequence): A batch of data from the dataloader (not used here).
        data_samples (List[ Tuple[torch.Tensor], List[Tuple[torch.Tensor]], Tuple[str] ]):
            A list containing three tuples:
            - A tuple of `spatial_features` (torch.Tensor): Shape [v_len_second, 3, 448, 448]. 
                `v_len_second` is total seconds of the video (though 2 for toy dataset) with minium 8 (i.e. min_video_seconds). 
                The len of the tuple is the batch size. 
            - A list of `motion_features` (Tuple[torch.Tensor]): 
                len(List) is total seconds of the video, with minium 8 (i.e. min_video_seconds).
                Each item of the list is a Tuple of motion feature tensors. Each has shape [32, 3, 224, 224].
                The len of the tuple is the batch size.
            - A tuple of `video_name` (str): Video filename. The len of the tuple is the batch size.
    """
    from .SimpleVQA.test_demo import pack_pathway_output

    results = []
    # print(type(data_samples)) # list
    spatial_features_tuple, motion_features_list, video_name_tuple = data_samples
    # print(len(spatial_features_tuple)) # 1
    # print(spatial_features_tuple[0].shape) # torch.Size([8, 3, 448, 448])

    # print(type(motion_features_list)) # List
    # print(len(motion_features_list)) # 8
    # print(type(motion_features_list[0])) # tuple
    # print(len(motion_features_list[0])) # 1
    # print(type(motion_features_list[0][0])) # Tensor
    # print(motion_features_list[0][0].shape) # torch.Size([32, 3, 224, 224])

    batch_size = len(spatial_features_tuple)
    with torch.no_grad():
        for i in range(batch_size):
            video_name = video_name_tuple[i]
            spatial_features = spatial_features_tuple[i].to(self.device).unsqueeze(0)  # Add batch dim. Shape: tensor with Size([1, v_len_second, 3, 448, 448])

            # Take the i-th element from each tuple in motion_features_list
            motion_features = [motion_features_list[j][i] for j in range(len(motion_features_list))] # Shape: List[tensor with Size([32, 3, 224, 224])], len of it is total seconds of the video, with minium 8.

            if not all(isinstance(mf, torch.Tensor) for mf in motion_features):
                raise TypeError("Expected motion_features to be a list of tensors.")

            if len(motion_features) == 0:  # Edge case: No valid motion features
                results.append({"video_name": video_name, "SimpleVQA_Score": 0.0})
                continue

            n_clip = len(motion_features)  # 8
            feature_motion = torch.zeros([n_clip, 2048 + 256], device=self.device) 
            # Process each motion clip
            for idx, clip in enumerate(motion_features):
                clip = clip.unsqueeze(dim=0).permute(0, 2, 1, 3, 4)  # Reshape to [1, C(3), T(32), H(224), W(224)]
                clip = pack_pathway_output(clip, self.device)  # Convert to SlowFast format
                slow_feature, fast_feature = self.model_motion(clip)
                slow_feature = slow_feature.squeeze()
                fast_feature = fast_feature.squeeze()

                motion_feature = torch.cat([slow_feature, fast_feature]).unsqueeze(0)  # Shape: [1, 2304]
                feature_motion[idx] = motion_feature 

            feature_motion = feature_motion.unsqueeze(0)  # Shape: [1, n_clip, 2304]

            outputs = self.model(spatial_features, feature_motion)
            score = outputs.item()

            results.append({"video_name": video_name, "SimpleVQA_Score": score})
            print(f"Processed score {score:.4f} for {video_name}")

    self.results.extend(results)

TIFAScore

Bases: BaseMetric

Initialize the TIFAScore evaluator.

Parameters:

Name Type Description Default
openai_key str

The user's api key of the LLM models openai provides.

required
llm_model str

The name of the LLM model used in the TIFAScore evaluator. Defaults to gpt-3.5-turbo.

'gpt-3.5-turbo'
unifiedqa_model_name str

The name of the UnifiedQAModel used in TIFAScore evaluator. Defaults to allenai/unifiedqa-v2-t5-large-1363200.

'allenai/unifiedqa-v2-t5-large-1363200'
vqa_model_name str

The name of the AIGVEModel used in TIFAScore evaluator. Defaults to mplug-large.

'mplug-large'
Source code in aigve/metrics/text_video_alignment/gpt_based/TIFA/tifa_eval.py
@METRICS.register_module()
class TIFAScore(BaseMetric):
    """ Initialize the ``TIFAScore`` evaluator.

    Args:   
        openai_key (str): The user's api key of the LLM models openai provides.
        llm_model (str): The name of the LLM model used in the TIFAScore evaluator. Defaults to ``gpt-3.5-turbo``.
        unifiedqa_model_name (str): The name of the ``UnifiedQAModel`` used in TIFAScore evaluator. Defaults to ``allenai/unifiedqa-v2-t5-large-1363200``.
        vqa_model_name (str): The name of the ``AIGVEModel used`` in TIFAScore evaluator. Defaults to ``mplug-large``.
    """
    def __init__(self, 
                 openai_key,
                 llm_model: str = 'gpt-3.5-turbo',
                 unifiedqa_model_name: str = 'allenai/unifiedqa-v2-t5-large-1363200',
                 vqa_model_name: str = 'mplug-large'):
        super().__init__()

        self.openai_key = openai_key
        self.llm_model = llm_model
        self.unifiedqa_model_name = unifiedqa_model_name
        self.openai_completion, self.get_question_and_answers, self.filter_question_and_answers, self.unifiedqa_model, self.tifa_score_single, self.vqa_model = lazy_import()
        self.unifiedqa_model = self.UnifiedQAModel(self.unifiedqa_model_name)
        self.vqa_model_name = vqa_model_name
        self.vqa_model = self.AIGVEModel(self.vqa_model_name)

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.openai_setup()

    def openai_setup(self):
        print('set up openai client')
        openai.api_key = self.openai_key
        assert openai.api_key is not None
        test_prompt_string = 'hello, how are you doing?'
        print('test prompt: ', test_prompt_string)
        response = self.openai_completion(
            test_prompt_string,
            model=self.llm_model,
        )
        print('test response: ', response)


    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """ TIFAScore process
        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_prompts, input_videos = data_samples
        bsz = len(input_prompts)

        # Ensure prompt_input is a tensor
        if isinstance(input_prompts, tuple):
            input_prompts = list(input_prompts)

        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)

        average_tifa_score_list = []
        for input_prompt, input_video in zip(input_prompts, input_videos):
            tifa_score = []
            # Generate questions with GPT-3.5-turbo
            gpt3_questions = self.get_question_and_answers(input_prompt)
            # print(gpt3_questions)
            # Filter questions with UnifiedQA
            filtered_questions = self.filter_question_and_answers(self.unifiedqa_model, gpt3_questions)
            for index, frame_path in enumerate(input_video):
                # calucluate TIFA score
                result = self.tifa_score_single(self.vqa_model, filtered_questions, frame_path)
                # print(result)
                tifa_score.append(result['tifa_score'])
            average_tifa_score = sum(tifa_score)/len(tifa_score)
            average_tifa_score_list.append(average_tifa_score)

        result['tifa_score'] = sum(average_tifa_score_list)/len(average_tifa_score_list)

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        tifa_score_np = np.zeros(len(results))
        for i, result in enumerate(results):
            tifa_score_np[i] = result['tifa_score']

        tifa_score_np_mean = np.mean(tifa_score_np) 

        print("Test results: tifa score={:.4f}"
              .format(tifa_score_np_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/gpt_based/TIFA/tifa_eval.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    tifa_score_np = np.zeros(len(results))
    for i, result in enumerate(results):
        tifa_score_np[i] = result['tifa_score']

    tifa_score_np_mean = np.mean(tifa_score_np) 

    print("Test results: tifa score={:.4f}"
          .format(tifa_score_np_mean))

    return result

process(data_batch, data_samples)

TIFAScore process Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/gpt_based/TIFA/tifa_eval.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """ TIFAScore process
    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_prompts, input_videos = data_samples
    bsz = len(input_prompts)

    # Ensure prompt_input is a tensor
    if isinstance(input_prompts, tuple):
        input_prompts = list(input_prompts)

    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)

    average_tifa_score_list = []
    for input_prompt, input_video in zip(input_prompts, input_videos):
        tifa_score = []
        # Generate questions with GPT-3.5-turbo
        gpt3_questions = self.get_question_and_answers(input_prompt)
        # print(gpt3_questions)
        # Filter questions with UnifiedQA
        filtered_questions = self.filter_question_and_answers(self.unifiedqa_model, gpt3_questions)
        for index, frame_path in enumerate(input_video):
            # calucluate TIFA score
            result = self.tifa_score_single(self.vqa_model, filtered_questions, frame_path)
            # print(result)
            tifa_score.append(result['tifa_score'])
        average_tifa_score = sum(tifa_score)/len(tifa_score)
        average_tifa_score_list.append(average_tifa_score)

    result['tifa_score'] = sum(average_tifa_score_list)/len(average_tifa_score_list)

    self.results.append(result)

VIEEvalScore

Bases: BaseMetric

Initialize the VIEEvalScore evaluator.

Parameters:

Name Type Description Default
llm_backbone str

The name of the LLM model used in the VIEEvalScore evaluator. Defaults to got4o.

'gpt4o'
api_key_path str

The user's api key path to initialize LLM models provides by openai.

'AIGVE_Tool/metrics/text_video_alignment/gpt_based/VIE/api_key.txt'
task str

The task the VIEEvalScore evaluator conducts. Defaults to ''t2v''.

't2v'
Source code in aigve/metrics/text_video_alignment/gpt_based/VIE/vie_eval.py
@METRICS.register_module()
class VIEEvalScore(BaseMetric):
    """ Initialize the ``VIEEvalScore`` evaluator.

    Args:
        llm_backbone (str): The name of the LLM model used in the VIEEvalScore evaluator. Defaults to ``got4o``.
        api_key_path (str): The user's api key path to initialize LLM models provides by openai.
        task (str): The task the VIEEvalScore evaluator conducts. Defaults to ''t2v''.
    """
    def __init__(self,
                 llm_backbone: str = "gpt4o",
                 api_key_path: str = 'AIGVE_Tool/metrics/text_video_alignment/gpt_based/VIE/api_key.txt',
                 task: str = 't2v',
                 ):
        super().__init__()

        self.api_key_path = api_key_path
        self.llm_backbone = llm_backbone
        self.task = task

        self.submodel_path = 'metrics/text_video_alignment/gpt_based/VIE'
        if not submodule_exists(self.submodel_path):
            add_git_submodule(
                repo_url='https://github.com/TIGER-AI-Lab/VIEScore.git', 
                submodule_path=self.submodel_path
            )  
        self.submodel_path = 'metrics/text_video_alignment/gpt_based/dsg'
        if not submodule_exists(self.submodel_path):
            add_git_submodule(
                repo_url='https://github.com/j-min/DSG.git', 
                submodule_path=self.submodel_path
            )  
        from .VIEScore.viescore import VIEScore 
        from .DSG.dsg.vqa_utils import MPLUG, InstructBLIP


        self.vie_score = VIEScore(backbone=self.llm_backbone, task=self.task, key_path=self.api_key_path)

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


    def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
        """VIEScore process
        Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (Sequence): A batch of data from the dataloader.
            data_samples (Sequence): A batch of data samples that
                contain annotations and predictions.
        """

        result = dict()

        input_prompts, input_videos = data_samples
        bsz = len(input_prompts)

        # Ensure prompt_input is a tensor
        if isinstance(input_prompts, tuple):
            input_prompts = list(input_prompts)

        if isinstance(input_videos, tuple):
            input_videos = list(input_videos)

        average_vie_score_list = []
        for input_prompt, input_video in zip(input_prompts, input_videos):
            vie_score_list = []
            for index, frame_path in enumerate(input_video):
                pil_image = Image.open(frame_path)
                score_list = self.vie_score.evaluate(pil_image, input_prompt)
                sementics_score, quality_score, overall_score = score_list
                vie_score_list.append(overall_score)
            average_vie_score = sum(vie_score_list)/len(vie_score_list)
            average_vie_score_list.append(average_vie_score)

        result['vie_score'] = sum(average_vie_score_list)/len(average_vie_score_list)

        self.results.append(result)


    def compute_metrics(self, results: list) -> Dict[str, float]:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            Dict[str, float]: The computed metrics. The keys are the names of
            the metrics, and the values are corresponding results.
        """
        logger: MMLogger = MMLogger.get_current_instance()

        vie_score_np = np.zeros(len(results))
        for i, result in enumerate(results):
            vie_score_np[i] = result['vie_score']

        vie_score_np_mean = np.mean(vie_score_np) 

        print("Test results: vie score with dependency={:.4f}"
              .format(vie_score_np_mean))

        return result

compute_metrics(results)

Compute the metrics from processed results.

Parameters:

Name Type Description Default
results list

The processed results of each batch.

required

Returns:

Type Description
Dict[str, float]

Dict[str, float]: The computed metrics. The keys are the names of

Dict[str, float]

the metrics, and the values are corresponding results.

Source code in aigve/metrics/text_video_alignment/gpt_based/VIE/vie_eval.py
def compute_metrics(self, results: list) -> Dict[str, float]:
    """Compute the metrics from processed results.

    Args:
        results (list): The processed results of each batch.

    Returns:
        Dict[str, float]: The computed metrics. The keys are the names of
        the metrics, and the values are corresponding results.
    """
    logger: MMLogger = MMLogger.get_current_instance()

    vie_score_np = np.zeros(len(results))
    for i, result in enumerate(results):
        vie_score_np[i] = result['vie_score']

    vie_score_np_mean = np.mean(vie_score_np) 

    print("Test results: vie score with dependency={:.4f}"
          .format(vie_score_np_mean))

    return result

process(data_batch, data_samples)

VIEScore process Process one batch of data samples and predictions. The processed results should be stored in self.results, which will be used to compute the metrics when all batches have been processed.

Parameters:

Name Type Description Default
data_batch Sequence

A batch of data from the dataloader.

required
data_samples Sequence

A batch of data samples that contain annotations and predictions.

required
Source code in aigve/metrics/text_video_alignment/gpt_based/VIE/vie_eval.py
def process(self, data_batch: Sequence, data_samples: Sequence) -> None:
    """VIEScore process
    Process one batch of data samples and predictions. The processed
    results should be stored in ``self.results``, which will be used to
    compute the metrics when all batches have been processed.

    Args:
        data_batch (Sequence): A batch of data from the dataloader.
        data_samples (Sequence): A batch of data samples that
            contain annotations and predictions.
    """

    result = dict()

    input_prompts, input_videos = data_samples
    bsz = len(input_prompts)

    # Ensure prompt_input is a tensor
    if isinstance(input_prompts, tuple):
        input_prompts = list(input_prompts)

    if isinstance(input_videos, tuple):
        input_videos = list(input_videos)

    average_vie_score_list = []
    for input_prompt, input_video in zip(input_prompts, input_videos):
        vie_score_list = []
        for index, frame_path in enumerate(input_video):
            pil_image = Image.open(frame_path)
            score_list = self.vie_score.evaluate(pil_image, input_prompt)
            sementics_score, quality_score, overall_score = score_list
            vie_score_list.append(overall_score)
        average_vie_score = sum(vie_score_list)/len(vie_score_list)
        average_vie_score_list.append(average_vie_score)

    result['vie_score'] = sum(average_vie_score_list)/len(average_vie_score_list)

    self.results.append(result)

VideoPhy

Bases: BaseMetric

Source code in aigve/metrics/multi_aspect_metrics/videophy/videophy_metric.py
@METRICS.register_module()
class VideoPhy(BaseMetric):
    def __init__(self,
                hf_token: str,
                collect_device: Optional[Union[str, torch.device]] = None,
                prefix: Optional[str] = None,
                metric_path: str = None,
                model_path: str = 'videophysics/videocon_physics',
                datainfo_path: str = None,
                test_index: int = None,
                 **kwargs):

        """
        This function is used to initialize the VideoPhy metric.

        Args:
            collect_device (str or torch.device): The device to use for collecting the data
            prefix (str): The prefix to use for the metric name
            metric_path (str): The path to the metric
            model_path (str): The path to the model
            datainfo_path (str): The path to the data info
            test_index (int): The index of the test
        """

        super().__init__(collect_device=collect_device, prefix=prefix)
        # self.train_index = train_index
        self.metric_path = metric_path
        self.model_path = model_path
        self.datainfo_path = datainfo_path
        self.test_index = test_index
        self.hf_token = hf_token
        self.results = []

        # self.submodule_path = './metrics/aigve'
        # if not submodule_exists(self.submodule_path):
        #     add_git_submodule(
        #         repo_url='https://github.com/Hritikbansal/videophy.git',
        #         submodule_path=self.submodule_path
        #     )

        self.tokenizer = LlamaTokenizer.from_pretrained(self.model_path, token=self.hf_token)
        self.image_processor = MplugOwlImageProcessor.from_pretrained(self.model_path)
        self.processor = MplugOwlProcessor(self.image_processor, self.tokenizer)
        self.model = MplugOwlForConditionalGeneration.from_pretrained(
            self.model_path,
            torch_dtype=torch.bfloat16,
        ).to('cuda')
        self.model.eval()

    def get_entail(self, logits, input_ids):
        """
        This function is used to get the entailment scores.

        Args:
            logits (torch.Tensor): A tensor containing the logits
            input_ids (torch.Tensor): A tensor containing the input IDs
        """
        softmax = nn.Softmax(dim=2)
        logits = softmax(logits)
        token_id_yes = self.tokenizer.encode('Yes', add_special_tokens=False)[0]
        token_id_no = self.tokenizer.encode('No', add_special_tokens=False)[0]
        entailment = []
        for j in range(len(logits)):
            for i in range(len(input_ids[j])):
                if input_ids[j][i] == self.tokenizer.pad_token_id:  # pad token if the answer is not present
                    i = i - 1
                    break
                elif i == len(input_ids[j]) - 1:
                    break
            score = logits[j][i][token_id_yes] / (logits[j][i][token_id_yes] + logits[j][i][token_id_no])
            entailment.append(score)
        entailment = torch.stack(entailment)
        return entailment

    def get_logits(self, data_batch):
        """
        This function is used to get the logits for each input in the data batch.

        Args:
            data_batch (dict): A dictionary containing the data batch
        Returns:
            logits (torch.Tensor): A tensor containing the logits for each input in the data batch
        """
        # Iterate over each item in the data batch
        for k, v in data_batch.items():
            # Check if the item is a tensor
            if torch.is_tensor(v):
                # Convert float tensors to bfloat16
                if v.dtype == torch.float:
                    data_batch[k] = v.bfloat16()
                # Move the tensor to the model's device (e.g., GPU)
                data_batch[k] = data_batch[k].to(self.model.device)

        # print("Data batch: ", data_batch.keys())
        outputs = self.model(pixel_values=data_batch['pixel_values'], video_pixel_values=data_batch['video_pixel_values'],
                        labels=None, \
                        num_images=data_batch['num_images'], num_videos=data_batch['num_videos'], input_ids=data_batch['input_ids'],
                        non_padding_mask=data_batch['non_padding_mask'], \
                        non_media_mask=data_batch['non_media_mask'], prompt_mask=data_batch['prompt_mask'])
        logits = outputs['logits']
        return logits


    def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None:
        """
        This function is used to process the data batch and compute the metric.

        Args:
            data_batch (dict): A dictionary containing the data batch
            data_samples (list): A list of dictionaries containing the data samples
        """
        logits = self.get_logits(data_batch)
        entails_scores =  self.get_entail(logits, data_batch['input_ids'])

        self.results.extend(entails_scores.cpu().detach().to(torch.float32).numpy().tolist())
        # self.results = entails_scores.cpu().detach().to(torch.float32).numpy().tolist()
        # print(self.results)


    def compute_metrics(self, results: list) -> dict:
        """
        This function is used to compute the metrics.

        Args:
            results (list): A list of results
        """
        return {
            'entailment': float(np.mean(results))
        }

__init__(hf_token, collect_device=None, prefix=None, metric_path=None, model_path='videophysics/videocon_physics', datainfo_path=None, test_index=None, **kwargs)

This function is used to initialize the VideoPhy metric.

Parameters:

Name Type Description Default
collect_device str or device

The device to use for collecting the data

None
prefix str

The prefix to use for the metric name

None
metric_path str

The path to the metric

None
model_path str

The path to the model

'videophysics/videocon_physics'
datainfo_path str

The path to the data info

None
test_index int

The index of the test

None
Source code in aigve/metrics/multi_aspect_metrics/videophy/videophy_metric.py
def __init__(self,
            hf_token: str,
            collect_device: Optional[Union[str, torch.device]] = None,
            prefix: Optional[str] = None,
            metric_path: str = None,
            model_path: str = 'videophysics/videocon_physics',
            datainfo_path: str = None,
            test_index: int = None,
             **kwargs):

    """
    This function is used to initialize the VideoPhy metric.

    Args:
        collect_device (str or torch.device): The device to use for collecting the data
        prefix (str): The prefix to use for the metric name
        metric_path (str): The path to the metric
        model_path (str): The path to the model
        datainfo_path (str): The path to the data info
        test_index (int): The index of the test
    """

    super().__init__(collect_device=collect_device, prefix=prefix)
    # self.train_index = train_index
    self.metric_path = metric_path
    self.model_path = model_path
    self.datainfo_path = datainfo_path
    self.test_index = test_index
    self.hf_token = hf_token
    self.results = []

    # self.submodule_path = './metrics/aigve'
    # if not submodule_exists(self.submodule_path):
    #     add_git_submodule(
    #         repo_url='https://github.com/Hritikbansal/videophy.git',
    #         submodule_path=self.submodule_path
    #     )

    self.tokenizer = LlamaTokenizer.from_pretrained(self.model_path, token=self.hf_token)
    self.image_processor = MplugOwlImageProcessor.from_pretrained(self.model_path)
    self.processor = MplugOwlProcessor(self.image_processor, self.tokenizer)
    self.model = MplugOwlForConditionalGeneration.from_pretrained(
        self.model_path,
        torch_dtype=torch.bfloat16,
    ).to('cuda')
    self.model.eval()

compute_metrics(results)

This function is used to compute the metrics.

Parameters:

Name Type Description Default
results list

A list of results

required
Source code in aigve/metrics/multi_aspect_metrics/videophy/videophy_metric.py
def compute_metrics(self, results: list) -> dict:
    """
    This function is used to compute the metrics.

    Args:
        results (list): A list of results
    """
    return {
        'entailment': float(np.mean(results))
    }

get_entail(logits, input_ids)

This function is used to get the entailment scores.

Parameters:

Name Type Description Default
logits Tensor

A tensor containing the logits

required
input_ids Tensor

A tensor containing the input IDs

required
Source code in aigve/metrics/multi_aspect_metrics/videophy/videophy_metric.py
def get_entail(self, logits, input_ids):
    """
    This function is used to get the entailment scores.

    Args:
        logits (torch.Tensor): A tensor containing the logits
        input_ids (torch.Tensor): A tensor containing the input IDs
    """
    softmax = nn.Softmax(dim=2)
    logits = softmax(logits)
    token_id_yes = self.tokenizer.encode('Yes', add_special_tokens=False)[0]
    token_id_no = self.tokenizer.encode('No', add_special_tokens=False)[0]
    entailment = []
    for j in range(len(logits)):
        for i in range(len(input_ids[j])):
            if input_ids[j][i] == self.tokenizer.pad_token_id:  # pad token if the answer is not present
                i = i - 1
                break
            elif i == len(input_ids[j]) - 1:
                break
        score = logits[j][i][token_id_yes] / (logits[j][i][token_id_yes] + logits[j][i][token_id_no])
        entailment.append(score)
    entailment = torch.stack(entailment)
    return entailment

get_logits(data_batch)

This function is used to get the logits for each input in the data batch.

Parameters:

Name Type Description Default
data_batch dict

A dictionary containing the data batch

required

Returns: logits (torch.Tensor): A tensor containing the logits for each input in the data batch

Source code in aigve/metrics/multi_aspect_metrics/videophy/videophy_metric.py
def get_logits(self, data_batch):
    """
    This function is used to get the logits for each input in the data batch.

    Args:
        data_batch (dict): A dictionary containing the data batch
    Returns:
        logits (torch.Tensor): A tensor containing the logits for each input in the data batch
    """
    # Iterate over each item in the data batch
    for k, v in data_batch.items():
        # Check if the item is a tensor
        if torch.is_tensor(v):
            # Convert float tensors to bfloat16
            if v.dtype == torch.float:
                data_batch[k] = v.bfloat16()
            # Move the tensor to the model's device (e.g., GPU)
            data_batch[k] = data_batch[k].to(self.model.device)

    # print("Data batch: ", data_batch.keys())
    outputs = self.model(pixel_values=data_batch['pixel_values'], video_pixel_values=data_batch['video_pixel_values'],
                    labels=None, \
                    num_images=data_batch['num_images'], num_videos=data_batch['num_videos'], input_ids=data_batch['input_ids'],
                    non_padding_mask=data_batch['non_padding_mask'], \
                    non_media_mask=data_batch['non_media_mask'], prompt_mask=data_batch['prompt_mask'])
    logits = outputs['logits']
    return logits

process(data_batch, data_samples)

This function is used to process the data batch and compute the metric.

Parameters:

Name Type Description Default
data_batch dict

A dictionary containing the data batch

required
data_samples list

A list of dictionaries containing the data samples

required
Source code in aigve/metrics/multi_aspect_metrics/videophy/videophy_metric.py
def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None:
    """
    This function is used to process the data batch and compute the metric.

    Args:
        data_batch (dict): A dictionary containing the data batch
        data_samples (list): A list of dictionaries containing the data samples
    """
    logits = self.get_logits(data_batch)
    entails_scores =  self.get_entail(logits, data_batch['input_ids'])

    self.results.extend(entails_scores.cpu().detach().to(torch.float32).numpy().tolist())

VideoScore

Bases: BaseMetric

Source code in aigve/metrics/multi_aspect_metrics/videoscore/videoscore_metric.py
@METRICS.register_module()
class VideoScore(BaseMetric):
    def __init__(self,
                collect_device: Optional[Union[str, torch.device]] = None,
                prefix: Optional[str] = None,
                metric_path: str = None,
                model_path: str = 'TIGER-Lab/VideoScore-v1.1',
                datainfo_path: str = None,
                test_index: int = None,
                 **kwargs):
        """
        Args:
            collect_device (Optional[Union[str, torch.device]]): The device to collect the data on.
            prefix (Optional[str]): The prefix to use for the metric.
            metric_path (str): The path to the metric file.
            model_path (str): The path to the model file.
            datainfo_path (str): The path to the datainfo file.
            test_index (int): The index of the test data.
        """
        super().__init__(collect_device=collect_device, prefix=prefix)
        # self.train_index = train_index
        # TODO: ARE THERE PARAMETERS REQUIRED FOR THIS METRIC?
        self.metric_path = metric_path
        self.model_path = model_path
        self.datainfo_path = datainfo_path
        self.test_index = test_index


        self.model = Idefics2ForSequenceClassification.from_pretrained(self.model_path, torch_dtype=torch.bfloat16).eval()
        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
        self.model.to(self.device)

        self.results = []

    def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None:
        """
        Args:
            data_batch (Any): The data batch to process.
            data_samples (Sequence[dict]): The data samples to process.
        """


        data_batch = {k: v[0].to(self.model.device) for k, v in data_batch.items()}

        with torch.no_grad():
            outputs = self.model(**data_batch)

        logits = outputs.logits.cpu().detach().to(torch.float32).numpy()
        num_aspects = logits.shape[-1]

        aspect_scores = []
        for i in range(num_aspects):
            aspect_scores.append(round(logits[0, i].item(), 3))

        self.results.append(aspect_scores)

    def compute_metrics(self, results: list) -> dict:
        """
        Args:
            results (list): The results to compute the metrics from.
        """
        results = np.array(results)
        mean_scores = np.mean(results, axis=1)

        return {'visual_quailty': results[:, 0].tolist(),
                'temporal_consistency': results[:, 1].tolist(),
                'dynamic_degree': results[:, 2].tolist(),
                'text-to-video_alignment': results[:, 3].tolist(),
                'factual_consistency': results[:, 4].tolist(),
                'summary': {'visual_quality': mean_scores[0], 'temporal_consistency': mean_scores[1],
                            'dynamic_degree': mean_scores[2], 'text-to-video_alignment': mean_scores[3],
                            'factual_consistency': mean_scores[4]}}

__init__(collect_device=None, prefix=None, metric_path=None, model_path='TIGER-Lab/VideoScore-v1.1', datainfo_path=None, test_index=None, **kwargs)

Parameters:

Name Type Description Default
collect_device Optional[Union[str, device]]

The device to collect the data on.

None
prefix Optional[str]

The prefix to use for the metric.

None
metric_path str

The path to the metric file.

None
model_path str

The path to the model file.

'TIGER-Lab/VideoScore-v1.1'
datainfo_path str

The path to the datainfo file.

None
test_index int

The index of the test data.

None
Source code in aigve/metrics/multi_aspect_metrics/videoscore/videoscore_metric.py
def __init__(self,
            collect_device: Optional[Union[str, torch.device]] = None,
            prefix: Optional[str] = None,
            metric_path: str = None,
            model_path: str = 'TIGER-Lab/VideoScore-v1.1',
            datainfo_path: str = None,
            test_index: int = None,
             **kwargs):
    """
    Args:
        collect_device (Optional[Union[str, torch.device]]): The device to collect the data on.
        prefix (Optional[str]): The prefix to use for the metric.
        metric_path (str): The path to the metric file.
        model_path (str): The path to the model file.
        datainfo_path (str): The path to the datainfo file.
        test_index (int): The index of the test data.
    """
    super().__init__(collect_device=collect_device, prefix=prefix)
    # self.train_index = train_index
    # TODO: ARE THERE PARAMETERS REQUIRED FOR THIS METRIC?
    self.metric_path = metric_path
    self.model_path = model_path
    self.datainfo_path = datainfo_path
    self.test_index = test_index


    self.model = Idefics2ForSequenceClassification.from_pretrained(self.model_path, torch_dtype=torch.bfloat16).eval()
    self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    self.model.to(self.device)

    self.results = []

compute_metrics(results)

Parameters:

Name Type Description Default
results list

The results to compute the metrics from.

required
Source code in aigve/metrics/multi_aspect_metrics/videoscore/videoscore_metric.py
def compute_metrics(self, results: list) -> dict:
    """
    Args:
        results (list): The results to compute the metrics from.
    """
    results = np.array(results)
    mean_scores = np.mean(results, axis=1)

    return {'visual_quailty': results[:, 0].tolist(),
            'temporal_consistency': results[:, 1].tolist(),
            'dynamic_degree': results[:, 2].tolist(),
            'text-to-video_alignment': results[:, 3].tolist(),
            'factual_consistency': results[:, 4].tolist(),
            'summary': {'visual_quality': mean_scores[0], 'temporal_consistency': mean_scores[1],
                        'dynamic_degree': mean_scores[2], 'text-to-video_alignment': mean_scores[3],
                        'factual_consistency': mean_scores[4]}}

process(data_batch, data_samples)

Parameters:

Name Type Description Default
data_batch Any

The data batch to process.

required
data_samples Sequence[dict]

The data samples to process.

required
Source code in aigve/metrics/multi_aspect_metrics/videoscore/videoscore_metric.py
def process(self, data_batch: Any, data_samples: Sequence[dict]) -> None:
    """
    Args:
        data_batch (Any): The data batch to process.
        data_samples (Sequence[dict]): The data samples to process.
    """


    data_batch = {k: v[0].to(self.model.device) for k, v in data_batch.items()}

    with torch.no_grad():
        outputs = self.model(**data_batch)

    logits = outputs.logits.cpu().detach().to(torch.float32).numpy()
    num_aspects = logits.shape[-1]

    aspect_scores = []
    for i in range(num_aspects):
        aspect_scores.append(round(logits[0, i].item(), 3))

    self.results.append(aspect_scores)

Organization of this Module

Neural Network-Based Evaluation Metrics

Distribution-Based Evaluation Metricsn Metrics

Vision-Language Similarity-Based Evaluation Metrics Metrics

Vision-Language Understanding-Based Evaluation Metrics

Multi-Faceted Evaluation Metrics