Source code for EduNLP.Pipeline.base

import torch
from EduNLP import logger
from typing import Union, List, Callable, Optional, Dict, Any, Tuple
from .components import PREPROCESSING_PIPES
from ..Pretrain import PretrainedEduTokenizer
from ..ModelZoo.base_model import BaseModel
from transformers.modeling_outputs import ModelOutput
from abc import ABC, abstractmethod

GenericTensor = Union["torch.Tensor", List["GenericTensor"]]


[docs]class PreProcessingPipeline(object): """ A pipeline for tokenization processing. You should use it by calling pipeline('pre-process'), instead of itself directly. Parameters ---------- pipe_names: `str` or `List[str]`, optional The quickly initialized pipeline components. For availabel pipes, check TOKENIZE_PIPES in `components`. To add componets more flexiblely with specific arguments or custom name, use `add_pipe`. Examples ---------- >>> tkn = PreProcessingPipeline(['is_sif', 'to_sif', 'is_sif', 'seg_describe']) >>> tkn.add_pipe(name='seg', symbol='fm', before='seg_describe') >>> tkn.component_names ['is_sif', 'to_sif', 'is_sif', 'seg', 'seg_describe'] >>> item = "如图所示,则三角形ABC的面积是_。" >>> tkn(item) False True {'t': 3, 'f': 1, 'g': 0, 'm': 1} ['如图所示,则三角形', '[FORMULA]', '的面积是', '[MARK]', '。'] >>> tkn.rename_pipe(0, 'is_sif_lol') >>> tkn.add_pipe('to_sif', component=lambda x:x, first=True) # This won't succeed for the same name pipe exists >>> tkn.add_pipe('identify', component=lambda x:x, before=1) >>> tkn.component_names ['is_sif_lol', 'identify', 'to_sif', 'is_sif_lol', 'seg', 'seg_describe'] """ def __init__(self, pipe_names: Optional[Union[List[str], str]] = None ): self._preproc_components = {} self.component_pipeline = [] if isinstance(pipe_names, list) and len(pipe_names) > 0: if any(comp_name not in PREPROCESSING_PIPES for comp_name in pipe_names): logger.error('Some components not existed!') raise ValueError for pipe_name in pipe_names: if pipe_name not in self._preproc_components: self._preproc_components[pipe_name] = PREPROCESSING_PIPES[pipe_name]() self.component_pipeline += [comp_name for comp_name in pipe_names] def __call__(self, inputs): for name in self.component_pipeline: proc = self._preproc_components[name] try: inputs = proc(inputs) except Exception as e: logger.error(e) return inputs def __len__(self): return len(self.component_pipeline) def _get_pipe_index( self, before: Optional[Union[str, int]] = None, after: Optional[Union[str, int]] = None, first: Optional[bool] = None, last: Optional[bool] = None ): if sum(arg is not None for arg in [before, after, first, last]) > 1: logger.error('Only one of before/after/first/last can be set!') raise ValueError if last or not any(arg is not None for arg in [before, after, first]): return len(self) elif first: return 0 elif isinstance(before, str): if before not in self.component_pipeline: logger.error('The before pipe does not exists!') raise ValueError else: return self.component_pipeline.index(before) elif isinstance(before, int): if before < 0 or before >= len(self.component_pipeline): logger.error('The before index must be greater than 0 and less than current length!') raise ValueError else: return before elif isinstance(after, str): if after not in self.component_pipeline: logger.error('The after pipe does not exists!') raise ValueError else: return self.component_pipeline.index(after) + 1 elif isinstance(after, int): if after < 0 or after >= len(self.component_pipeline): logger.error('The after index must be greater than 0 and less than current length!') raise ValueError else: return after + 1 else: raise ValueError
[docs] def add_pipe( self, name: str, component: Optional[Callable] = None, before: Optional[Union[str, int]] = None, after: Optional[Union[str, int]] = None, first: Optional[bool] = None, last: Optional[bool] = None, *args, **kwargs ): """ Add a component to the tokenization pipeline. Valid component must be Callable and feat its next component. Only one parameter of before/after/first/last can be set. Default setting is `last`. Notice: 1. Please try to avoid more than one usages of one same pipe, otherwise you can only modify them with index. i.e. `before` and `after` works well only when the pipe is unique. 2. The `*args, **kwargs` parameters will be passed to component constructor in `PREPROCESSING_PIPES`, and this only works when you do not give a callable component. Parameters ---------- name: `str`, required the name of pipe component: `Callable`, optional the custom pipe component, be careful with its nearest components' input&output. before: `str` or `int`, optional name or index of the component to insert new component directly before. Index start from 0. after: `str` or `int`, optional name or index of the component to insert new component directly after. Index start from 0. first: `bool`, optional if true, insert the component first in the pipeline. last: `bool`, optional if true, insert the component last in the pipeline. """ pipe_index = self._get_pipe_index(before, after, first, last) if component is None and name not in self._preproc_components: if name not in PREPROCESSING_PIPES: logger.error(f'Unknown pipe "{name}"') raise ValueError else: self._preproc_components[name] = PREPROCESSING_PIPES[name](*args, **kwargs) else: if name in self._preproc_components: logger.warn(f'One preserved component "{name}" has the same name, inserting is stopped.') return self._preproc_components[name] = component self.component_pipeline.insert(pipe_index, name)
[docs] def remove_pipe( self, pipe: Union[str, int] ): """ Remove a component from the pre-processing pipeline """ if isinstance(pipe, str): if pipe not in self._preproc_components: logger.error(f'Unknown pipe "{pipe}"') raise ValueError self.component_pipeline.remove(pipe) return self._preproc_components[pipe] else: removed = self.component_pipeline.pop(pipe) return self._preproc_components[removed]
[docs] def rename_pipe( self, old_pipe: Union[str, int], new_name: str, ): """ Rename a component from the pre-processing pipeline. Parameters ---------- old_pipe: `str` or `int`, required old component name for `str`, or old component index in the pipeline for `int` new_name: `str`, required new name for the component """ if isinstance(old_pipe, int): old_pipe = self.component_pipeline[old_pipe] if old_pipe not in self._preproc_components: logger.error(f'Unknown pipe "{old_pipe}"') raise ValueError self._preproc_components[new_name] = self._preproc_components.pop(old_pipe) self.component_pipeline = [new_name if i == old_pipe else i for i in self.component_pipeline]
@property def component_names(self): """ Get the names of pipeline components """ return self.component_pipeline.copy() @property def pipeline(self): """ Get the processing pipeline consisting of (name, component) tuples. """ return [(name, self._preproc_components[name]) for name in self.component_pipeline]
[docs]class Pipeline(ABC): """ The pipeline class is the class from which all pipelines inherit. Pipeline workflow is defined as a sequence of the following operations: Input -> PreProcessingPipeline -> Tokenization -> Model Inference -> Post-Processing (downstream task dependent) -> Output This class is not for using directly, refer to `Pipeline.__init__.pipeline` function. """ def __init__(self, task: Optional[str] = None, model: Optional[BaseModel] = None, tokenizer: Optional[PretrainedEduTokenizer] = None, preproc_pipe_names: Optional[List] = None, **kwargs ): if preproc_pipe_names is None: preproc_pipe_names = [] self.preproc_pipeline = PreProcessingPipeline(preproc_pipe_names) self.tokenizer = tokenizer self.model = model self.task = task self._tokenize_params, self._forward_params, self._postprocess_params = self._sanitize_parameters(**kwargs) def __len__(self): _length = len(self.preproc_pipeline) + sum( component is not None for component in [self.tokenizer, self.model, self.task]) return _length @abstractmethod def _sanitize_parameters(self, **pipeline_parameters: Dict): """ _sanitize_parameters will be automatically called with any excessive named arguments from either '__init__' and '__call__' methods. Any inheritor of Pipeline should implement it, and it should return 3 dictionaries of parameters used by '_tokenize', '_forward' and 'postprocess' methods """ raise NotImplementedError("_sanitize_parameters not implemented") @abstractmethod def _tokenize(self, input_: Any, **tokenize_parameters: Dict) -> Dict[str, GenericTensor]: """ _tokenize will take the `input_` of a specific pipeline, go through it on a tokenizer and return a dictionary of everything necessary for `forward` to run properly. """ raise NotImplementedError("_tokenize not implemented") @abstractmethod def _forward(self, input_: Dict[str, GenericTensor], **forward_parameters: Dict) -> ModelOutput: """ _forward will receive the prepared dictionary from `tokenization` and run it on the model. """ raise NotImplementedError("_forward not implemented")
[docs] @abstractmethod def postprocess(self, model_outputs: ModelOutput, **postprocess_parameters: Dict) -> Any: """ postprocess will receive the outputs of `_forward` method and reformat them into something more friendly based on specific task. """ raise NotImplementedError("postprocess not implemented")
[docs] def add_pipe(self, *args, **kwargs): """ refer to PreProcessingPipeline.add_pipe """ return self.preproc_pipeline.add_pipe(*args, **kwargs)
[docs] def remove_pipe(self, *args, **kwargs): """ refer to PreProcessingPipeline.remove_pipe """ return self.preproc_pipeline.remove_pipe(*args, **kwargs)
[docs] def rename_pipe(self, *args, **kwargs): """ refer to PreProcessingPipeline.rename_pipe """ return self.preproc_pipeline.rename_pipe(*args, **kwargs)
@property def component_names(self): """ Get the names of pipeline components """ _component_names = self.preproc_pipeline.component_names.copy() if len(self.preproc_pipeline) > 0 else [] if self.tokenizer is not None: _component_names.append("tokenizer") if self.model is not None: _component_names.append(self.model.__class__.__name__) if self.task is not None: _component_names.append(self.task) return _component_names @property def pipeline(self): """ Get the processing pipeline consisting of (name, component) tuples. """ _pipeline = self.preproc_pipeline.pipeline if len(self.preproc_pipeline) > 0 else [] if self.tokenizer is not None: _pipeline.append(("tokenizer", self.tokenizer)) if self.model is not None: _pipeline.append((self.model.__class__.__name__, self.model)) if self.task is not None: _pipeline.append((self.task, None)) return _pipeline def __call__(self, inputs, *args, num_workers=None, **kwargs): if args: logger.warning(f"Ignoring args: {args}") is_batch = isinstance(inputs, list) tokenize_params, forward_params, postprocess_params = self._sanitize_parameters(**kwargs) tokenize_params = {**self._tokenize_params, **tokenize_params} forward_params = {**self._forward_params, **forward_params} postprocess_params = {**self._postprocess_params, **postprocess_params} if is_batch: return [self.run_single(item, tokenize_params, forward_params, postprocess_params) for item in inputs] else: return self.run_single(inputs, tokenize_params, forward_params, postprocess_params)
[docs] def run_single(self, inputs, tokenize_params, model_params, postprocess_params): tokenize_inputs = self.preproc_pipeline(inputs) model_inputs = self._tokenize([tokenize_inputs], **tokenize_params) model_outputs = self._forward(model_inputs, **model_params) outputs = self.postprocess(model_outputs, **postprocess_params) return outputs