Starwhale Model Evaluation SDK
@evaluation.predictâ
The @evaluation.predict decorator defines the inference process in the Starwhale Model Evaluation, similar to the map phase in MapReduce. It contains the following core features:
- On the Server instance, require the resources needed to run.
- Automatically read the local or remote datasets, and pass the data in the datasets one by one or in batches to the function decorated by
evaluation.predict. - By the replicas setting, implement distributed dataset consumption to horizontally scale and shorten the time required for the model evaluation tasks.
- Automatically store the return values of the function and the input features of the dataset into the
resultstable, for display in the Web UI and further use in theevaluatephase. - The decorated function is called once for each single piece of data or each batch, to complete the inference process.
Parametersâ
resources: (dict, optional)- Defines the resources required by each
predicttask when running on the Server instance, includingmemory,cpu, andnvidia.com/gpu. memory: The unit is Bytes, int and float types are supported.- Supports setting
requestandlimitas a dictionary, e.g.resources={"memory": {"request": 100 * 1024, "limit": 200 * 1024}}. - If only a single number is set, the Python SDK will automatically set
requestandlimitto the same value, e.g.resources={"memory": 100 * 1024}is equivalent toresources={"memory": {"request": 100 * 1024, "limit": 100 * 1024}}.
- Supports setting
cpu: The unit is the number of CPU cores, int and float types are supported.- Supports setting
requestandlimitas a dictionary, e.g.resources={"cpu": {"request": 1, "limit": 2}}. - If only a single number is set, the SDK will automatically set
requestandlimitto the same value, e.g.resources={"cpu": 1.5}is equivalent toresources={"cpu": {"request": 1.5, "limit": 1.5}}.
- Supports setting
nvidia.com/gpu: The unit is the number of GPUs, int type is supported.nvidia.com/gpudoes not support settingrequestandlimit, only a single number is supported.
- Note: The
resourcesparameter currently only takes effect on the Server instances. For the Cloud instances, the same can be achieved by selecting the corresponding resource pool when submitting the evaluation task. Standalone instances do not support this feature at all.
- Defines the resources required by each
replicas: (int, optional)- The number of replicas to run
predict. predictdefines aStep, in which there are multiple equivalentTasks. EachTaskruns on a Pod in Cloud/Server instances, and a Thread in Standalone instances.- When multiple replicas are specified, they are equivalent and will jointly consume the selected dataset to achieve distributed dataset consumption. It can be understood that a row in the dataset will only be read by one
predictreplica. - The default is 1.
- The number of replicas to run
batch_size: (int, optional)- Batch size for passing data from the dataset into the function.
- The default is 1.
fail_on_error: (bool, optional)- Whether to interrupt the entire model evaluation when the decorated function throws an exception. If you expect some "exceptional" data to cause evaluation failures but don't want to interrupt the overall evaluation, you can set
fail_on_error=False. - The default is
True.
- Whether to interrupt the entire model evaluation when the decorated function throws an exception. If you expect some "exceptional" data to cause evaluation failures but don't want to interrupt the overall evaluation, you can set
auto_log: (bool, optional)- Whether to automatically log the return values of the function and the input features of the dataset to the
resultstable. - The default is
True.
- Whether to automatically log the return values of the function and the input features of the dataset to the
log_mode: (str, optional)- When
auto_log=True, you can setlog_modeto define logging the return values inplainorpickleformat. - The default is
pickle.
- When
log_dataset_features: (List[str], optional)- When
auto_log=True, you can selectively log certain features from the dataset via this parameter. - By default, all features will be logged.
- When
needs: (List[Callable], optional)- Defines the prerequisites for this task to run, can use the needs syntax to implement DAG.
needsaccepts functions decorated by@evaluation.predict,@evaluation.evaluate, and@handler.- The default is empty, i.e. does not depend on any other tasks.
Inputâ
The decorated functions need to define some input parameters to accept dataset data, etc. They contain the following patterns:
data:datais a dict type that can read the features of the dataset.- When
batch_size=1orbatch_sizeis not set, the label feature can be read throughdata['label']ordata.label. - When
batch_sizeis set to > 1,datais a list.
from starwhale import evaluation
@evaluation.predict
def predict(data):
print(data['label'])
print(data.label)data+external:datais a dict type that can read the features of the dataset.externalis also a dict, including:index,index_with_dataset,dataset_info,contextanddataset_urikeys. The attributes can be used for the further fine-grained processing.index: The index of the dataset row.index_with_dataset: The index with the dataset info.dataset_info:starwhale.core.dataset.tabular.TabularDatasetInfoClass.context:starwhale.ContextClass.dataset_uri:starwhale.nase.uri.resource.ResourceClass.
from starwhale import evaluation
@evaluation.predict
def predict(data, external):
print(data['label'])
print(data.label)
print(external["context"])
print(external["dataset_uri"])data+**kw:datais a dict type that can read the features of the dataset.kwis a dict that containsexternal.
from starwhale import evaluation
@evaluation.predict
def predict(data, **kw):
print(kw["external"]["context"])
print(kw["external"]["dataset_uri"])*args+**kwargs:- The first argument of args list is
data.
from starwhale import evaluation
@evaluation.predict
def predict(*args, **kw):
print(args[0].label)
print(args[0]["label"])
print(kw["external"]["context"])- The first argument of args list is
**kwargs:from starwhale import evaluation
@evaluation.predict
def predict(**kw):
print(kw["data"].label)
print(kw["data"]["label"])
print(kw["external"]["context"])*args:*argsdoes not containexternal.
from starwhale import evaluation
@evaluation.predict
def predict(*args):
print(args[0].label)
print(args[0]["label"])
Examplesâ
from starwhale import evaluation
@evaluation.predict
def predict_image(data):
...
@evaluation.predict(
dataset="mnist/version/latest",
batch_size=32,
replicas=4,
needs=[predict_image],
)
def predict_batch_images(batch_data)
...
@evaluation.predict(
resources={"nvidia.com/gpu": 1,
"cpu": {"request": 1, "limit": 2},
"memory": 200 * 1024}, # 200MB
log_mode="plain",
)
def predict_with_resources(data):
...
@evaluation.predict(
replicas=1,
log_mode="plain",
log_dataset_features=["txt", "img", "label"],
)
def predict_with_selected_features(data):
...
@evaluation.evaluateâ
@evaluation.evaluate is a decorator that defines the evaluation process in the Starwhale Model evaluation, similar to the reduce phase in MapReduce. It contains the following core features:
- On the Server instance, apply for the resources.
- Read the data recorded in the
resultstable automatically during thepredictphase, and pass it into the function as an iterator. - The
evaluatephase will only run one replica, and cannot define thereplicasparameter like thepredictphase.
Parametersâ
resources: (dict, optional)- Consistent with the
resourcesparameter definition in@evaluation.predict.
- Consistent with the
needs: (List[Callable], optional)- Consistent with the
needsparameter definition in@evaluation.predict. - In the common case, it will depend on a function decorated by
@evaluation.predict.
- Consistent with the
use_predict_auto_log: (bool, optional)- Defaults to
True, passes an iterator that can traverse thepredictresults to the function.
- Defaults to
Inputâ
- When
use_predict_auto_log=True(default), pass an iterator that can traverse thepredictresults into the function.- The iterated object is a dictionary containing two keys:
outputandinput.outputis the element returned by thepredictstage function.inputis the features of the corresponding dataset during the inference process, which is a dictionary type.
- The iterated object is a dictionary containing two keys:
- When
use_predict_auto_log=False, do not pass any parameters into the function.
Examplesâ
from starwhale import evaluation
@evaluation.evaluate(needs=[predict_image])
def evaluate_results(predict_result_iter):
...
@evaluation.evaluate(
use_predict_auto_log=False,
needs=[predict_image],
)
def evaluate_results():
...
class Evaluationâ
starwhale.Evaluation implements the abstraction for Starwhale Model Evaluation, and can perform operations like logging and scanning for Model Evaluation on Standalone/Server/Cloud instances, to record and retrieve metrics.
__init__â
__init__ function initializes Evaluation object.
class Evaluation
def __init__(self, id: str, project: Project | str) -> None:
Parametersâ
id: (str, required)- The UUID of Model Evaluation that is generated by Starwhale automatically.
project: (Project|str, required)Projectobject or Project URI str.
Exampleâ
from starwhale import Evaluation
standalone_e = Evaluation("fcd1206bf1694fce8053724861c7874c", project="self")
server_e = Evaluation("fcd1206bf1694fce8053724861c7874c", project="cloud://server/project/starwhale:starwhale")
cloud_e = Evaluation("2ddab20df9e9430dbd73853d773a9ff6", project="https://cloud.starwhale.cn/project/starwhale:llm-leaderboard")
from_contextâ
from_context is a classmethod that obtains the Evaluation object under the current Context. from_context can only take effect under the task runtime environment. Calling this method in a non-task runtime environment will raise a RuntimeError exception, indicating that the Starwhale Context has not been properly set.
@classmethod
def from_context(cls) -> Evaluation:
Exampleâ
from starwhale import Evaluation
with Evaluation.from_context() as e:
e.log("label/1", 1, {"loss": 0.99, "accuracy": 0.98})
logâ
log is a method that logs evaluation metrics to a specific table, which can then be viewed on the Server/Cloud instance's web page or through the scan method.
def log(
self, category: str, id: t.Union[str, int], metrics: t.Dict[str, t.Any]
) -> None:
Parametersâ
category: (str, required)- The category of the logged metrics, which will be used as the suffix of the Starwhale Datastore table name.
- Each
categorycorresponds to a Starwhale Datastore table. These tables will be isolated by the evaluation task ID and will not affect each other.
id: (str|int, required)- The ID of the logged record, unique within the table.
- For the same table, only str or int can be used as the ID type.
metrics: (dict, required)- A dict to log metrics in key-value format.
- Keys are of str type.
- Values can be constant types like
int,float,str,bytes,bool, or compound types liketuple,list,dict. It also supports logging Artifacts types likeStarwhale.Image,Starwhale.Video,Starwhale.Audio,Starwhale.Text,Starwhale.Binary.- When the value contains
dicttype, the Starwhale SDK will automatically flatten the dict for better visualization and metric comparison. - For example, if metrics is
{"test": {"loss": 0.99, "prob": [0.98,0.99]}, "image": [Image, Image]}, it will be stored as{"test/loss": 0.99, "test/prob": [0.98, 0.99], "image/0": Image, "image/1": Image}after flattening.
- When the value contains
Exampleâ
from starwhale import Evaluation
evaluation_store = Evaluation.from_context()
evaluation_store.log("label/1", 1, {"loss": 0.99, "accuracy": 0.98})
evaluation_store.log("ppl", "1", {"a": "test", "b": 1})
scanâ
scan is a method that returns an iterator for reading data from certain model evaluation tables.
def scan(
self,
category: str,
start: t.Any = None,
end: t.Any = None,
keep_none: bool = False,
end_inclusive: bool = False,
) -> t.Iterator:
Parametersâ
category: (str, required)- Same meaning as the
categoryparameter in thelogmethod.
- Same meaning as the
start: (Any, optional)- Start key, if not specified, start from the first data item in the table.
end: (Any, optional)- End key, if not specified, iterate to the end of the table.
keep_none: (bool, optional)- Whether to return columns with None values, not returned by default.
end_inclusive: (bool, optional)- Whether to include the row corresponding to
end, not included by default.
- Whether to include the row corresponding to
Exampleâ
from starwhale import Evaluation
evaluation_store = Evaluation(id="2ddab20df9e9430dbd73853d773a9ff6", project="https://cloud.starwhale.cn/projects/349")
results = [data for data in evaluation_store.scan("label/0")]
flushâ
flush is a method that can immediately flush the metrics logged by the log method to the datastore and oss storage. If the flush method is not called, Evaluation will automatically flush data to storage when it is finally closed.
def flush(self, category: str, artifacts_flush: bool = True) -> None
Parametersâ
category: (str, required)- Same meaning as the
categoryparameter in thelogmethod.
- Same meaning as the
artifacts_flush: (bool, optional)- Whether to dump artifact data to blob files and upload them to related storage. Default is
True.
- Whether to dump artifact data to blob files and upload them to related storage. Default is
log_resultâ
log_result is a method that logs evaluation metrics to the results table, equivalent to calling the log method with category set to results. The results table is generally used to store inference results. By default, @starwhale.predict will store the return value of the decorated function in the results table, you can also manually store using log_results.
def log_result(self, id: t.Union[str, int], metrics: t.Dict[str, t.Any]) -> None:
Parametersâ
id: (str|int, required)- The ID of the record, unique within the results table.
- For the results table, only str or int can be used as the ID type.
metrics: (dict, required)- Same definition as the
metricsparameter in thelogmethod.
- Same definition as the
Exampleâ
from starwhale import Evaluation
evaluation_store = Evaluation(id="2ddab20df9e9430dbd73853d773a9ff6", project="self")
evaluation_store.log_result(1, {"loss": 0.99, "accuracy": 0.98})
evaluation_store.log_result(2, {"loss": 0.98, "accuracy": 0.99})
scan_resultsâ
scan_results is a method that returns an iterator for reading data from the results table.
def scan_results(
self,
start: t.Any = None,
end: t.Any = None,
keep_none: bool = False,
end_inclusive: bool = False,
) -> t.Iterator:
Parametersâ
start: (Any, optional)- Start key, if not specified, start from the first data item in the table.
- Same definition as the
startparameter in thescanmethod.
end: (Any, optional)- End key, if not specified, iterate to the end of the table.
- Same definition as the
endparameter in thescanmethod.
keep_none: (bool, optional)- Whether to return columns with None values, not returned by default.
- Same definition as the
keep_noneparameter in thescanmethod.
end_inclusive: (bool, optional)- Whether to include the row corresponding to
end, not included by default. - Same definition as the
end_inclusiveparameter in thescanmethod.
- Whether to include the row corresponding to
Exampleâ
from starwhale import Evaluation
evaluation_store = Evaluation(id="2ddab20df9e9430dbd73853d773a9ff6", project="self")
evaluation_store.log_result(1, {"loss": 0.99, "accuracy": 0.98})
evaluation_store.log_result(2, {"loss": 0.98, "accuracy": 0.99})
results = [data for data in evaluation_store.scan_results()]
flush_resultsâ
flush_results is a method that can immediately flush the metrics logged by the log_results method to the datastore and oss storage. If the flush_results method is not called, Evaluation will automatically flush data to storage when it is finally closed.
def flush_results(self, artifacts_flush: bool = True) -> None:
Parametersâ
artifacts_flush: (bool, optional)- Whether to dump artifact data to blob files and upload them to related storage. Default is
True. - Same definition as the
artifacts_flushparameter in theflushmethod.
- Whether to dump artifact data to blob files and upload them to related storage. Default is
log_summaryâ
log_summary is a method that logs certain metrics to the summary table. The evaluation page on Server/Cloud instances displays data from the summary table.
Each time it is called, Starwhale will automatically update with the unique ID of this evaluation as the row ID of the table. This function can be called multiple times during one evaluation to update different columns.
Each project has one summary table. All evaluation tasks under that project will write summary information to this table for easy comparison between evaluations of different models.
def log_summary(self, *args: t.Any, **kw: t.Any) -> None:
Same as log method, log_summary will automatically flatten the dict.
Exampleâ
from starwhale import Evaluation
evaluation_store = Evaluation(id="2ddab20df9e9430dbd73853d773a9ff6", project="https://cloud.starwhale.cn/projects/349")
evaluation_store.log_summary(loss=0.99)
evaluation_store.log_summary(loss=0.99, accuracy=0.99)
evaluation_store.log_summary({"loss": 0.99, "accuracy": 0.99})
get_summaryâ
get_summary is a method that returns the information logged by log_summary.
def get_summary(self) -> t.Dict:
flush_summaryâ
flush_summary is a method that can immediately flush the metrics logged by the log_summary method to the datastore and oss storage. If the flush_results method is not called, Evaluation will automatically flush data to storage when it is finally closed.
def flush_summary(self, artifacts_flush: bool = True) -> None:
Parametersâ
artifacts_flush: (bool, optional)- Whether to dump artifact data to blob files and upload them to related storage. Default is
True. - Same definition as the
artifacts_flushparameter in theflushmethod.
- Whether to dump artifact data to blob files and upload them to related storage. Default is
flush_allâ
flush_all is a method that can immediately flush the metrics logged by log, log_results, log_summary methods to the datastore and oss storage. If the flush_all method is not called, Evaluation will automatically flush data to storage when it is finally closed.
def flush_all(self, artifacts_flush: bool = True) -> None:
Parametersâ
artifacts_flush: (bool, optional)- Whether to dump artifact data to blob files and upload them to related storage. Default is
True. - Same definition as the
artifacts_flushparameter in theflushmethod.
- Whether to dump artifact data to blob files and upload them to related storage. Default is
get_tablesâ
get_tables is a method that returns the names of all tables generated during model evaluation. Note that this function does not return the summary table name.
def get_tables(self) -> t.List[str]:
closeâ
close is a method to close the Evaluation object. close will automatically flush data to storage when called. Evaluation also implements __enter__ and __exit__ methods, which can simplify manual close calls using with syntax.
def close(self) -> None:
Exampleâ
from starwhale import Evaluation
evaluation_store = Evaluation(id="2ddab20df9e9430dbd73853d773a9ff6", project="https://cloud.starwhale.cn/projects/349")
evaluation_store.log_summary(loss=0.99)
evaluation_store.close()
# auto close when the with-context exits.
with Evaluation.from_context() as e:
e.log_summary(loss=0.99)
@handlerâ
@handler is a decorator that provides the following functionalities:
- On a Server instance, it requests the required resources to run.
- It can control the number of replicas.
- Multiple handlers can form a DAG through dependency relationships to control the execution workflow.
- It can expose ports externally to run like a web handler.
@fine_tune, @evaluation.predict and @evaluation.evalute can be considered applications of @handler in the certain specific areas. @handler is the underlying implementation of these decorators and is more fundamental and flexible.
@classmethod
def handler(
cls,
resources: t.Optional[t.Dict[str, t.Any]] = None,
replicas: int = 1,
needs: t.Optional[t.List[t.Callable]] = None,
name: str = "",
expose: int = 0,
require_dataset: bool = False,
) -> t.Callable:
Parametersâ
resources: (dict, optional)- Consistent with the
resourcesparameter definition in@evaluation.predict.
- Consistent with the
needs: (List[Callable], optional)- Consistent with the
needsparameter definition in@evaluation.predict.
- Consistent with the
replicas: (int, optional)- Consistent with the
replicasparameter definition in@evaluation.predict.
- Consistent with the
name: (str, optional)- The name displayed for the handler.
- If not specified, use the decorated function's name.
expose: (int, optional)- The port exposed externally. When running a web handler, the exposed port needs to be declared.
- The default is 0, meaning no port is exposed.
- Currently only one port can be exposed.
require_dataset: (bool, optional)- Defines whether this handler requires a dataset when running.
- If
required_dataset=True, the user is required to input a dataset when creating an evaluation task on the Server/Cloud instance web page. Ifrequired_dataset=False, the user does not need to specify a dataset on the web page. - The default is
False.
Examplesâ
from starwhale import handler
import gradio
@handler(resources={"cpu": 1, "nvidia.com/gpu": 1}, replicas=3)
def my_handler():
...
@handler(needs=[my_handler])
def my_another_handler():
...
@handler(expose=7860)
def chatbot():
with gradio.Blocks() as server:
...
server.launch(server_name="0.0.0.0", server_port=7860)
@fine_tuneâ
fine_tune is a decorator that defines the fine-tuning process for model training.
Some restrictions and usage suggestions:
fine_tunehas only one replica.fine_tunerequires dataset input.- Generally, the dataset is obtained through
Context.get_runtime_context()at the start offine_tune. - Generally, at the end of
fine_tune, the fine-tuned Starwhale model package is generated throughstarwhale.model.build, which will be automatically copied to the corresponding evaluation project.
Parametersâ
resources: (dict, optional)- Consistent with the
resourcesparameter definition in@evaluation.predict.
- Consistent with the
needs: (List[Callable], optional)- Consistent with the
needsparameter definition in@evaluation.predict.
- Consistent with the
Examplesâ
from starwhale import model as starwhale_model
from starwhale import fine_tune, Context
@fine_tune(resources={"nvidia.com/gpu": 1})
def llama_fine_tuning():
ctx = Context.get_runtime_context()
if len(ctx.dataset_uris) == 2:
# TODO: use more graceful way to get train and eval dataset
train_dataset = dataset(ctx.dataset_uris[0], readonly=True, create="forbid")
eval_dataset = dataset(ctx.dataset_uris[1], readonly=True, create="forbid")
elif len(ctx.dataset_uris) == 1:
train_dataset = dataset(ctx.dataset_uris[0], readonly=True, create="forbid")
eval_dataset = None
else:
raise ValueError("Only support 1 or 2 datasets(train and eval dataset) for now")
#user training code
train_llama(
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
model_name = get_model_name()
starwhale_model.build(name=f"llama-{model_name}-qlora-ft")
@multi_classificationâ
The @multi_classification decorator uses the sklearn lib to analyze results for multi-classification problems, outputting the confusion matrix, ROC, AUC etc., and writing them to related tables in the Starwhale Datastore.
When using it, certain requirements are placed on the return value of the decorated function, which should be (label, result) or (label, result, probability_matrix).
def multi_classification(
confusion_matrix_normalize: str = "all",
show_hamming_loss: bool = True,
show_cohen_kappa_score: bool = True,
show_roc_auc: bool = True,
all_labels: t.Optional[t.List[t.Any]] = None,
) -> t.Any:
Parametersâ
confusion_matrix_normalize: (str, optional)- Accepts three parameters:
true: rowspred: columnsall: rows+columns
- Accepts three parameters:
show_hamming_loss: (bool, optional)- Whether to calculate the Hamming loss.
- The default is
True.
show_cohen_kappa_score: (bool, optional)- Whether to calculate the Cohen kappa score.
- The default is
True.
show_roc_auc: (bool, optional)- Whether to calculate ROC/AUC. To calculate, the function needs to return a (label, result, probability_matrix) tuple, otherwise a (label, result) tuple is sufficient.
- The default is
True.
all_labels: (List, optional)- Defines all the labels.
Examplesâ
@multi_classification(
confusion_matrix_normalize="all",
show_hamming_loss=True,
show_cohen_kappa_score=True,
show_roc_auc=True,
all_labels=[i for i in range(0, 10)],
)
def evaluate(ppl_result) -> t.Tuple[t.List[int], t.List[int], t.List[t.List[float]]]:
label, result, probability_matrix = [], [], []
return label, result, probability_matrix
@multi_classification(
confusion_matrix_normalize="all",
show_hamming_loss=True,
show_cohen_kappa_score=True,
show_roc_auc=False,
all_labels=[i for i in range(0, 10)],
)
def evaluate(ppl_result) -> t.Tuple[t.List[int], t.List[int], t.List[t.List[float]]]:
label, result = [], [], []
return label, result
PipelineHandlerâ
The PipelineHandler class provides a default model evaluation workflow definition that requires users to implement the predict and evaluate functions.
The PipelineHandler is equivalent to using the @evaluation.predict and @evaluation.evaluate decorators together - the usage looks different but the underlying model evaluation process is the same.
Note that PipelineHandler currently does not support defining resources parameters.
Users need to implement the following functions:
predict: Defines the inference process, equivalent to a function decorated with@evaluation.predict.evaluate: Defines the evaluation process, equivalent to a function decorated with@evaluation.evaluate.
from typing import Any, Iterator
from abc import ABCMeta, abstractmethod
class PipelineHandler(metaclass=ABCMeta):
def __init__(
self,
predict_batch_size: int = 1,
ignore_error: bool = False,
predict_auto_log: bool = True,
predict_log_mode: str = PredictLogMode.PICKLE.value,
predict_log_dataset_features: t.Optional[t.List[str]] = None,
**kwargs: t.Any,
) -> None:
self.context = Context.get_runtime_context()
...
def predict(self, data: Any, **kw: Any) -> Any:
raise NotImplementedError
def evaluate(self, ppl_result: Iterator) -> Any
raise NotImplementedError
Parametersâ
predict_batch_size: (int, optional)- Equivalent to the
batch_sizeparameter in@evaluation.predict. - Default is 1.
- Equivalent to the
ignore_error: (bool, optional)- Equivalent to the
fail_on_errorparameter in@evaluation.predict. - Default is
False.
- Equivalent to the
predict_auto_log: (bool, optional)- Equivalent to the
auto_logparameter in@evaluation.predict. - Default is
True.
- Equivalent to the
predict_log_mode: (str, optional)- Equivalent to the
log_modeparameter in@evaluation.predict. - Default is
pickle.
- Equivalent to the
predict_log_dataset_features: (bool, optional)- Equivalent to the
log_dataset_features parameterin@evaluation.predict. - Default is
None, which records all features.
- Equivalent to the
PipelineHandler.run Decoratorâ
The PipelineHandler.run decorator can be used to describe resources for the predict and evaluate methods, supporting definitions of replicas and resources:
- The
PipelineHandler.rundecorator can only decoratepredictandevaluatemethods in subclasses inheriting fromPipelineHandler. - The
predictmethod can set thereplicasparameter. Thereplicasvalue for theevaluatemethod is always 1. - The
resourcesparameter is defined and used in the same way as theresourcesparameter in@evaluation.predictor@evaluation.evaluate. - The
PipelineHandler.rundecorator is optional. - The
PipelineHandler.rundecorator only takes effect on Server and Cloud instances, not Standalone instances that don't support resource definition.
@classmethod
def run(
cls, resources: t.Optional[t.Dict[str, t.Any]] = None, replicas: int = 1
) -> t.Callable:
Examplesâ
import typing as t
import torch
from starwhale import PipelineHandler
class Example(PipelineHandler):
def __init__(self) -> None:
super().__init__()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = self._load_model(self.device)
@PipelineHandler.run(replicas=4, resources={"memory": 1 * 1024 * 1024 *1024, "nvidia.com/gpu": 1}) # 1G Memory, 1 GPU
def predict(self, data: t.Dict):
data_tensor = self._pre(data.img)
output = self.model(data_tensor)
return self._post(output)
@PipelineHandler.run(resources={"memory": 1 * 1024 * 1024 *1024}) # 1G Memory
def evaluate(self, ppl_result):
result, label, pr = [], [], []
for _data in ppl_result:
label.append(_data["input"]["label"])
result.extend(_data["output"][0])
pr.extend(_data["output"][1])
return label, result, pr
def _pre(self, input: Image) -> torch.Tensor:
...
def _post(self, input):
...
def _load_model(self, device):
...
Contextâ
The context information passed during model evaluation, including Project, Task ID, etc. The Context content is automatically injected and can be used in the following ways:
- Inherit the
PipelineHandlerclass and use theself.contextobject. - Get it through
Context.get_runtime_context().
Note that Context can only be used during model evaluation, otherwise the program will throw an exception.
Currently Context can get the following values:
project: str- Project name.
version: str- Unique ID of model evaluation.
step: str- Step name.
total: int- Total number of Tasks under the Step.
index: int- Task index number, starting from 0.
dataset_uris: List[str]- List of Starwhale dataset URIs.
Examplesâ
from starwhale import Context, PipelineHandler
def func():
ctx = Context.get_runtime_context()
print(ctx.project)
print(ctx.version)
print(ctx.step)
...
class Example(PipelineHandler):
def predict(self, data: t.Dict):
print(self.context.project)
print(self.context.version)
print(self.context.step)
@starwhale.api.service.apiâ
@starwhale.api.service.api is a decorator that provides a simple Web Handler input definition based on Gradio for accepting external requests and returning inference results to the user when launching a Web Service with the swcli model serve command, enabling online evaluation.
Examplesâ
import gradio
from starwhale.api.service import api
def predict_image(img):
...
@api(gradio.File(), gradio.Label())
def predict_view(file: t.Any) -> t.Any:
with open(file.name, "rb") as f:
data = Image(f.read(), shape=(28, 28, 1))
_, prob = predict_image({"img": data})
return {i: p for i, p in enumerate(prob)}
starwhale.api.service.Serviceâ
If you want to customize the web service implementation, you can subclass Service and override the serve method.
class CustomService(Service):
def serve(self, addr: str, port: int, handler_list: t.List[str] = None) -> None:
...
svc = CustomService()
@svc.api(...)
def handler(data):
...
Notes:
- Handlers added with
PipelineHandler.add_apiand theapidecorator orService.apican work together - If using a custom
Service, you need to instantiate the custom Service class in the model
Custom Request and Responseâ
Request and Response are handler preprocessing and postprocessing classes for receiving user requests and returning results. They can be simply understood as pre and post logic for the handler.
Starwhale provides built-in Request implementations for Dataset types and Json Response. Users can also customize the logic as follows:
import typing as t
from starwhale.api.service import (
Request,
Service,
Response,
)
class CustomInput(Request):
def load(self, req: t.Any) -> t.Any:
return req
class CustomOutput(Response):
def __init__(self, prefix: str) -> None:
self.prefix = prefix
def dump(self, req: str) -> bytes:
return f"{self.prefix} {req}".encode("utf-8")
svc = Service()
@svc.api(request=CustomInput(), response=CustomOutput("hello"))
def foo(data: t.Any) -> t.Any:
...