diff --git a/qiskit_experiments/curve_analysis/__init__.py b/qiskit_experiments/curve_analysis/__init__.py index 5053735859..348014cb05 100644 --- a/qiskit_experiments/curve_analysis/__init__.py +++ b/qiskit_experiments/curve_analysis/__init__.py @@ -10,13 +10,427 @@ # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. -""" +r""" ========================================================= Curve Analysis (:mod:`qiskit_experiments.curve_analysis`) ========================================================= .. currentmodule:: qiskit_experiments.curve_analysis +Curve analysis provides the analysis base class for a variety of experiments with 1-D parameter scan. +Subclasses can override several class attributes to define the behavior of the +data formatting and fitting. Here we describe how code developers can create new curve fit +analysis inheriting from the base class. + + +Overview +======== + +The base class :class:`CurveAnalysis` supports multi-objective optimization on +different sets of experiment results, and you can also define multiple independent +optimization tasks in the same class. The analysis is implemented with the following data model. + +- Group: This is top level component of the fitting. If an analysis defines + multiple groups, it performs multiple independent optimizations + and generates results for every optimization group. + +- Series: This is a collection of curves to form a multi-objective optimization task. + The fit entries in the same series share the fit parameters, + and multiple experimental results are simultaneously fit to generate a single fit result. + +- Curve: This is a single entry of analysis. Every curve may take unique filter keywords + to extract corresponding (x, y) data from the whole experimental results, + along with the callback function used for the curve fitting. + +To manage this structure, curve analysis provides a special dataclass :class:`SeriesDef` +that represents an optimization configuration for a single curve data. +Based on this information, the analysis automatically constructs proper optimization logic. +Thus one can avoid writing boilerplate code in various curve analyses +and quickly write up the analysis code for a particular experiment. +This analysis generates a set of :class:`~qiskit_experiments.framework.AnalysisResultData` +entries with a single Matplotlib plot of the fit curves with raw data points. + +.. _curve_analysis_define_new: + +Defining new curves +=================== + +You can intuitively write the definition of a new curve, as shown below: + +.. code-block:: python3 + + from qiskit_experiments.curve_analysis import SeriesDef, fit_function + + SeriesDef( + fit_func=lambda x, p0, p1, p2: fit_function.exponential_decay( + x, amp=p0, lamb=p1, baseline=p2 + ), + model_description="p0 * exp(-p1 * x) + p2", + ) + +The minimum field you must fill with is the ``fit_func``, which is a callback function used +with the optimization solver. Here you must call one of the fit functions from the module +:mod:`qiskit_experiments.curve_analysis.fit_function` because they implement +special logic to compute error propagation. +Note that argument name of the fit function, i.e. ``[p0, p1, p2]``, is important because +the signature of the provided fit function is parsed behind the scenes and +used as a parameter name of the analysis result instance. +Thus, this name may be used to populate your experiment database with the result. + +Optionally you can set ``model_description`` which is a string representation of your +fitting model that will be passed to the analysis result as a part of metadata. +This instance should be set to :attr:`CurveAnalysis.__series__` as a python list. + +For multi-objective optimization, i.e. if you have more than two curves that you +want to optimize simultaneously, you can create a list consisting of multiple curve entries. +In this case, the curves defined in the series definition form a single composite function +and the analysis solves the optimization problem: + +.. math:: + + \Theta_{\mbox{opt}} = \arg\min_\Theta \sum_i \sigma_i^{-2} (f_i(x_i, \theta_i) - y_i)^2, + +where :math:`\Theta = \{\theta_0, \theta_1, ..., \theta_N\} \in \mathbb{R}` and +this analysis has multiple fit functions each defined in the :attr:`SeriesDef.fit_func` +:math:`f_i(x_i, \theta_i)` with fit parameters :math:`\theta_i`. +Note that each fit model can take different parameters distinguished by function argument name +:math:`\theta_i = \{ p_{i0}, p_{i1}, ..., p_{iM} \}`. +Now we run a set of experiments that scans experiment parameters :math:`x_i` +and measure the outcomes :math:`y_i` with uncertainties :math:`\sigma_i`. +In the analysis, the solver will find the parameters :math:`\Theta_{\mbox{opt}}` +that simultaneously minimize the chi-squared values of all fit models defined in the series. +Here is an example how to implement such multi-objective optimization task: + +.. code-block:: python3 + + [ + SeriesDef( + name="my_experiment1", + fit_func=lambda x, p0, p1, p3: fit_function.exponential_decay( + x, amp=p0, lamb=p1, baseline=p3 + ), + filter_kwargs={"tag": 1}, + plot_color="red", + plot_symbol="^", + ), + SeriesDef( + name="my_experiment2", + fit_func=lambda x, p0, p2, p3: fit_function.exponential_decay( + x, amp=p0, lamb=p2, baseline=p3 + ), + filter_kwargs={"tag": 2}, + plot_color="blue", + plot_symbol="o", + ), + ] + +Note that now you also need to provide ``name`` and ``filter_kwargs`` to +distinguish the entries and filter the corresponding (x, y) data from the experiment results. +Optionally, you can provide ``plot_color`` and ``plot_symbol`` to visually +separate two curves in the plot. In this model, you have 4 parameters ``[p0, p1, p2, p3]`` +and the two curves share ``p0`` (``p3``) for ``amp`` (``baseline``) of +the :func:`exponential_decay` fit function. +Here one should expect the experiment results will have two classes of data with metadata +``"tag": 1`` and ``"tag": 2`` for ``my_experiment1`` and ``my_experiment2``, respectively. + +By using this model, one can flexibly set up your fit model. Here is another example: + +.. code-block:: python3 + + [ + SeriesDef( + name="my_experiment1", + fit_func=lambda x, p0, p1, p2, p3: fit_function.cos( + x, amp=p0, freq=p1, phase=p2, baseline=p3 + ), + filter_kwargs={"tag": 1}, + plot_color="red", + plot_symbol="^", + ), + SeriesDef( + name="my_experiment2", + fit_func=lambda x, p0, p1, p2, p3: fit_function.sin( + x, amp=p0, freq=p1, phase=p2, baseline=p3 + ), + filter_kwargs={"tag": 2}, + plot_color="blue", + plot_symbol="o", + ), + ] + +You have the same set of fit parameters for two curves, but now you fit two datasets +with different trigonometric functions. + +.. _curve_analysis_fixed_param: + +Fitting with fixed parameters +============================= + +You can also fix certain parameters during the curve fitting by specifying +parameter names in the class attribute :attr:`CurveAnalysis.__fixed_parameters__`. +This feature is useful especially when you want to define a subclass of +a particular analysis class. + +.. code-block:: python3 + + class AnalysisA(CurveAnalysis): + + __series__ = [ + SeriesDef( + fit_func=lambda x, p0, p1, p2: fit_function.exponential_decay( + x, amp=p0, lamb=p1, baseline=p2 + ), + ), + ] + + class AnalysisB(AnalysisA): + + __fixed_parameters__ = ["p0"] + + @classmethod + def _default_options(cls) -> Options: + options = super()._default_options() + options.p0 = 3 + + return options + +The parameter specified in :attr:`CurveAnalysis.__fixed_parameters__` should be provided +via the analysis options. Thus you may need to define a default value of the parameter in the +:meth:`CurveAnalysis._default_options`. +This code will give you identical fit model to the one defined in the following class: + +.. code-block:: python3 + + class AnalysisB(CurveAnalysis): + + __series__ = [ + SeriesDef( + fit_func=lambda x, p1, p2: fit_function.exponential_decay( + x, amp=3, lamb=p1, baseline=p2 + ), + ), + ] + +However, note that you can also inherit other features, e.g. the algorithm to +generate initial guesses, from the :class:`AnalysisA` in the first example. +On the other hand, in the latter case, you need to manually copy and paste +every logic defined in the :class:`AnalysisA`. + +.. _curve_analysis_multiple_tasks: + +Defining multiple tasks +======================= + +The code below shows how a subclass can define separate optimization tasks. + +.. code-block:: python3 + + [ + SeriesDef( + name="my_experiment1", + fit_func=lambda x, p0, p1, p2, p3: fit_function.cos( + x, amp=p0, freq=p1, phase=p2, baseline=p3 + ), + filter_kwargs={"tag": 1}, + plot_color="red", + plot_symbol="^", + group="cos", + ), + SeriesDef( + name="my_experiment2", + fit_func=lambda x, p0, p1, p2, p3: fit_function.sin( + x, amp=p0, freq=p1, phase=p2, baseline=p3 + ), + filter_kwargs={"tag": 2}, + plot_color="blue", + plot_symbol="o", + group="sin", + ), + ] + +The code looks almost identical to one in :ref:`curve_analysis_define_new`, +however, here we are providing a unique ``group`` value to each series definition. +In this configuration, the parameters ``[p0, p1, p2, p3]`` are not shared among +underlying curve fittings, thus we will get two fit parameter sets as a result. +This means any fit parameter value may change between curves. +The parameters can be distinguished by the ``group`` value passed to the result metadata. + +This is identical to running individual ``my_experiment1`` and ``my_experiment2`` as a +:class:`~qiskit_experiments.framework.BatchExperiment` and collect fit results afterwards +in the analysis class attached to the batch experiment instance. + +.. code-block:: python3 + + from qiskit_experiments.framework import BatchExperiment + + exp1 = MyExperiment1(...) + exp2 = MyExperiment2(...) + + batch_exp = BatchExperiment([exp1, exp2]) + batch_exp.analysis = MyAnalysis(...) + +However, this may require a developer to write many classes, for example, +here you may want to implement :class:`MyAnalysis` analysis class in addition to the +analysis classes for :class:`MyExperiment1` and :class:`MyExperiment2`. +On the other hand, using ``group`` feature allows you to complete the same analysis +within a single class instance. + +.. _curve_analysis_format: + +Pre-processing the fit data +=========================== + +A subclass may override :meth:`CurveAnalysis._format_data` to perform custom pre-processing +on experiment data before computing the initial guesses. +Here a subclass may perform data smoothing, removal of outliers, etc... +By default, it performs averaging of y values over the same x values, +followed by the data sort by x values. +This method should return a :class:`CurveData` instance with `label="fit_ready"`. + +.. _curve_analysis_init_guess: + +Providing initial guesses and boundaries +======================================== + +A template for initial guesses and boundaries is automatically generated in +:attr:`CurveAnalysis.options` as a dictionary keyed on the parameter names parsed from +the series definition. The default values are set to ``None``. +The list of parameter names is also available in the property +:attr:`CurveAnalysis.fit_params`. + +A developer of the curve analysis subclass is recommended to override +:meth:`CurveAnalysis._generate_fit_guesses` to provide systematic guesses and boundaries +based on the experimental result. +For accessing the formatted experiment result, you can use the :meth:`CurveAnalysis._data` method. + +.. code-block:: python3 + + curve_data = self._data(series_name="my_experiment1") + + x = curve_data.x # you can get x-values + y = curve_data.y # you can get y-values + +In addition, there are several common initial guess estimators available in +:mod:`qiskit_experiments.curve_analysis.guess`. + +When fit is performed without any prior information of parameters, it usually +falls into unsatisfactory result. This method is called with :class:`FitOptions` +instance which is dict-like object. This class implements convenient methods to +manage conflict with user provided values, i.e. user provided values have higher priority, +thus systematically generated values cannot override user values. + +.. code-block:: python3 + + opt1 = user_opt.copy() + opt1.p0.set_if_empty(p0=3) + opt1.bounds = set_if_empty(p0=(0, 10)) + opt1.add_extra_options(method="lm") + + opt2 = user_opt.copy() + opt2.p0.set_if_empty(p0=4) + + return [opt1, opt2] + +``user_opt`` is a :class:`FitOptions` instance, which consists of sub-dictionaries for +initial guesses (``.p0``) and boundaries (``.bounds``). +The :meth:`.set_if_empty` method overrides the parameter value only when the user doesn't provide +any prior information. +``user_opt`` also has extra configuration dictionary that is directly passed to +the curve fitting function. Note that the :class:`CurveAnalysis` uses +SciPy `curve_fit`_ function as a core solver. See the API documentation for available options. + +The final fitting outcome is determined with the following procedure. + +1. ``user_opt`` is initialized with the values provided by the user via the analysis options. + +2. The algorithmic guess is generated in :meth:`_generate_fit_guesses`, + where the logic implemented by a subclass may override the ``user_opt``. + If you want, you can copy it to create multiple fitting configurations. + When multiple configurations are generated here, the curve fitter runs fitting multiple times. + +3. If multiple configurations are created, the curve analysis framework checks + duplication of configurations and performs fitting multiple times with a unique configuration set. + +4. The curve fitter computes a reduced chi-squared value for every attempt, + and finds the outcome with the minimum reduced chi-squared value. + If the fitting fails, or the solver cannot find reasonable parameters within the maximum recursion, + it just ignores the current configuration and moves to the next. + If all provided configurations fail, it raises ``UserWarning`` and continues + the rest of the analysis. + +5. Analysis results are automatically generated if the curve fitter + successfully finds the best-fit outcome. + +.. _curve_fit: https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.curve_fit.html + +.. _curve_analysis_evaluate: + +Evaluating fit quality +====================== + +A subclass can override :meth:`CurveAnalysis._evaluate_quality` method to provide an algorithm to +evaluate quality of the fitting. This method is called with the :class:`FitData` object +which contains fit parameters and the reduced chi-squared value. +Qiskit Experiments often uses the empirical condition chi-squared < 3 as a goodness of fitting. + +.. _curve_analysis_new_quantity: + +Computing new quantity with fit parameters +========================================== + +Once the best fit parameters are found, the :meth:`CurveAnalysis._extra_database_entry` method is +called with the same :class:`FitData` object. +You can compute new quantities by combining multiple fit parameters. + +.. code-block:: python3 + + from qiskit_experiments.framework import AnalysisResultData + + p0 = fit_data.fitval("p0") + p1 = fit_data.fitval("p1") + + extra_entry = AnalysisResultData( + name="p01", + value=p0 * p1, + ) + +Note that both ``p0`` and ``p1`` are `ufloat`_ object consisting of +a nominal value and an error value which assumes the standard deviation. +Since this object natively supports error propagation, you don't need to manually compute errors. + +.. _ufloat: https://pythonhosted.org/uncertainties/user_guide.html + +.. _curve_analysis_saved_entry: + +Managing fit parameters to be saved in the database +=================================================== + +By default :class:`CurveAnalysis` only stores a single entry ``@Parameters_``. +This entry consists of a value which is a list of all fitting parameters +with extra metadata involving their covariance matrix. +If you want to save a particular parameter as a standalone entry, +you can override the ``result_parameters`` option of the analysis. +By using :class:`ParameterRepr` representation, you can rename the parameter in the database. + +.. code-block:: python3 + + from qiskit_experiments.curve_analysis import ParameterRepr + + def _default_options(cls) -> Options: + options = super()._default_options() + options.result_parameters = [ParameterRepr("p0", "amp", "Hz")] + + return options + +Here the first argument ``p0`` is the target parameter defined in the series definition, +``amp`` is the representation of ``p0`` in the database, and ``Hz`` is the unit of the value +which might be optionally provided. + + +If there is any missing feature you can write a feature request as an issue in our +`GitHub `_. + + Classes ======= diff --git a/qiskit_experiments/curve_analysis/curve_analysis.py b/qiskit_experiments/curve_analysis/curve_analysis.py index 53ccb1b116..4c52c1046c 100644 --- a/qiskit_experiments/curve_analysis/curve_analysis.py +++ b/qiskit_experiments/curve_analysis/curve_analysis.py @@ -16,26 +16,24 @@ # pylint: disable=invalid-name import copy -import dataclasses -import functools -import inspect +import itertools import warnings from abc import ABC -from typing import Any, Dict, List, Tuple, Callable, Union, Optional +from typing import Any, Dict, List, Tuple, Callable, Union, Optional, Iterator import numpy as np +import scipy.optimize as opt import uncertainties from uncertainties import unumpy as unp from qiskit.providers import Backend from qiskit_experiments.curve_analysis.curve_data import ( CurveData, - SeriesDef, + CompositeFitFunction, FitData, ParameterRepr, FitOptions, ) -from qiskit_experiments.curve_analysis.curve_fit import multi_curve_fit from qiskit_experiments.curve_analysis.data_processing import multi_mean_xy_data, data_sort from qiskit_experiments.curve_analysis.visualization import FitResultPlotters, PlotterStyle from qiskit_experiments.data_processing import DataProcessor @@ -59,175 +57,15 @@ class CurveAnalysis(BaseAnalysis, ABC): The subclasses can override class attributes to define the behavior of data extraction and fitting. This docstring describes how code developers can create a new curve fit analysis subclass inheriting from this base class. + See :mod:`qiskit_experiments.curve_analysis` for the developer note. Class Attributes: - - ``__series__``: A set of data points that will be fit to the same parameters - in the fit function. If this analysis contains multiple curves, - the same number of series definitions should be listed. Each series definition - is a :class:`SeriesDef` element, that may be initialized with - - - ``fit_func``: The function to which the data will be fit. - - ``filter_kwargs``: Circuit metadata key and value associated with this curve. - The data points of the curve are extracted from ExperimentData based on - this information. - - ``name``: Name of the curve. This is arbitrary data field, but should be unique. - - ``plot_color``: String color representation of this series in the plot. - - ``plot_symbol``: String formatter of the scatter of this series in the plot. + - ``__series__``: A list of :class:`SeriesDef` defining a fitting model + for every experimental data set. The attribute cannot be overridden. + Subclass of curve analysis must define this attribute. - ``__fixed_parameters__``: A list of parameter names fixed during the fitting. - These parameters should be provided in some way. For example, you can provide - them via experiment options or analysis options. Parameter names should be - used in the ``fit_func`` in the series definition. - - See the Examples below for more details. - - - Examples: - - **A fitting for single exponential decay curve** - - In this type of experiment, the analysis deals with a single curve. - Thus filter_kwargs and series name are not necessary defined. - - .. code-block:: - - class AnalysisExample(CurveAnalysis): - - __series__ = [ - SeriesDef( - fit_func=lambda x, p0, p1, p2: - exponential_decay(x, amp=p0, lamb=p1, baseline=p2), - ), - ] - - **A fitting for two exponential decay curve with partly shared parameter** - - In this type of experiment, the analysis deals with two curves. - We need a __series__ definition for each curve, and filter_kwargs should be - properly defined to separate each curve series. - - .. code-block:: - - class AnalysisExample(CurveAnalysis): - - __series__ = [ - SeriesDef( - name="my_experiment1", - fit_func=lambda x, p0, p1, p2, p3: - exponential_decay(x, amp=p0, lamb=p1, baseline=p3), - filter_kwargs={"experiment": 1}, - plot_color="red", - plot_symbol="^", - ), - SeriesDef( - name="my_experiment2", - fit_func=lambda x, p0, p1, p2, p3: - exponential_decay(x, amp=p0, lamb=p2, baseline=p3), - filter_kwargs={"experiment": 2}, - plot_color="blue", - plot_symbol="o", - ), - ] - - In this fit model, we have 4 parameters `p0, p1, p2, p3` and both series share - `p0` and `p3` as `amp` and `baseline` of the `exponential_decay` fit function. - Parameter `p1` (`p2`) is only used by `my_experiment1` (`my_experiment2`). - Both series have same fit function in this example. - - - **A fitting for two trigonometric curves with the same parameter** - - In this type of experiment, the analysis deals with two different curves. - However the parameters are shared with both functions. - - .. code-block:: - - class AnalysisExample(CurveAnalysis): - - __series__ = [ - SeriesDef( - name="my_experiment1", - fit_func=lambda x, p0, p1, p2, p3: - cos(x, amp=p0, freq=p1, phase=p2, baseline=p3), - filter_kwargs={"experiment": 1}, - plot_color="red", - plot_symbol="^", - ), - SeriesDef( - name="my_experiment2", - fit_func=lambda x, p0, p1, p2, p3: - sin(x, amp=p0, freq=p1, phase=p2, baseline=p3), - filter_kwargs={"experiment": 2}, - plot_color="blue", - plot_symbol="o", - ), - ] - - In this fit model, we have 4 parameters `p0, p1, p2, p3` and both series share - all parameters. However, these series have different fit curves, i.e. - `my_experiment1` (`my_experiment2`) uses the `cos` (`sin`) fit function. - - - **A fitting with fixed parameter** - - In this type of experiment, we can provide fixed fit function parameter. - This parameter should be assigned via analysis options - and not passed to the fitter function. - - .. code-block:: - - class AnalysisExample(CurveAnalysis): - - __series__ = [ - SeriesDef( - fit_func=lambda x, p0, p1, p2: - exponential_decay(x, amp=p0, lamb=p1, baseline=p2), - ), - ] - - __fixed_parameters__ = ["p1"] - - You can add arbitrary number of parameters to the class variable - ``__fixed_parameters__`` from the fit function arguments. - This parameter should be defined with the fit functions otherwise the analysis - instance cannot be created. In above example, parameter ``p1`` should be also - defined in the analysis options. This parameter will be excluded from the fit parameters - and thus will not appear in the analysis result. - - Notes: - This CurveAnalysis class provides several private methods that subclasses can override. - - - Customize pre-data processing: - Override :meth:`~self._format_data`. For example, here you can apply smoothing - to y values, remove outlier, or apply filter function to the data. - By default, data is sorted by x values and the measured values at the same - x value are averaged. - - - Create extra data from fit result: - Override :meth:`~self._extra_database_entry`. You need to return a list of - :class:`~qiskit_experiments.framework.analysis_result_data.AnalysisResultData` - object. This returns an empty list by default. - - - Customize fit quality evaluation: - Override :meth:`~self._evaluate_quality`. This value will be shown in the - database. You can determine the quality represented by the predefined string - "good" or "bad" based on fit result, - such as parameter uncertainty and reduced chi-squared value. - This returns ``None`` by default. This means evaluation is not performed. - - - Customize fitting options: - Override :meth:`~self._generate_fit_guesses`. For example, here you can - calculate initial guess from experiment data and setup fitter options. - - See docstring of each method for more details. - - Note that other private methods are not expected to be overridden. - If you forcibly override these methods, the behavior of analysis logic is not well tested - and we cannot guarantee it works as expected (you may suffer from bugs). - Instead, you can open an issue in qiskit-experiment github to upgrade this class - with proper unittest framework. - - https://github.com/Qiskit/qiskit-experiments/issues + These parameters should be provided via the analysis options. """ #: List[SeriesDef]: List of mapping representing a data series @@ -236,6 +74,65 @@ class AnalysisExample(CurveAnalysis): #: List[str]: Fixed parameter in fit function. Value should be set to the analysis options. __fixed_parameters__ = list() + # Automatically generated fitting functions of child class + _composite_funcs = None + + # Automatically generated fitting parameters of child class + _fit_params = None + + def __init_subclass__(cls, **kwargs): + """Parse series definition of subclass and set fit function and signature.""" + + super().__init_subclass__(**kwargs) + + # Validate if all fixed parameter names are defined in the fit model + if cls.__fixed_parameters__: + # This generates order-insensitive collection of all fitting parameters + # defined under the analysis. Since SeriesDef.signature returns a list, + # this generates a flat list from iterator and typecast it into + # set to remove duplicaed values. + all_params = set(itertools.chain.from_iterable(s.signature for s in cls.__series__)) + if any(p not in all_params for p in cls.__fixed_parameters__): + raise AnalysisError("Not existing parameter is fixed.") + + # Parse series information and generate function and signature + fit_groups = dict() + for i, series in enumerate(cls.__series__): + if series.group not in fit_groups: + fit_groups[series.group] = { + "fit_functions": [series.fit_func], + "signatures": [series.signature], + "curve_inds": [i], + "models": [series.model_description], + } + else: + fit_groups[series.group]["fit_functions"].append(series.fit_func) + fit_groups[series.group]["signatures"].append(series.signature) + fit_groups[series.group]["curve_inds"].append(i) + fit_groups[series.group]["models"].append(series.model_description) + + composite_funcs = [ + CompositeFitFunction( + **config, + fixed_parameters=cls.__fixed_parameters__, + group=group, + ) + for group, config in fit_groups.items() + ] + + # Dictionary of fit functions to each group + cls._composite_funcs = composite_funcs + + # All fit parameter names that this analysis manages + # Let's keep order of parameters rather than using set, though code is bit messy. + # It is better to match composite function signature with the func in series definition. + fit_args = [] + for func in composite_funcs: + for param in func.signature: + if param not in fit_args: + fit_args.append(param) + cls._fit_params = fit_args + def __init__(self): """Initialize data fields that are privately accessed by methods.""" super().__init__() @@ -249,51 +146,142 @@ def __init__(self): #: Backend: backend object used for experimentation self.__backend = None - @classmethod - def _fit_params(cls) -> List[str]: - """Return a list of fitting parameters. + @staticmethod + def curve_fit( + func: CompositeFitFunction, + xdata: np.ndarray, + ydata: np.ndarray, + sigma: np.ndarray, + p0: Dict[str, float], + bounds: Dict[str, Tuple[float, float]], + **kwargs, + ) -> FitData: + """Perform curve fitting. + + This is the scipy curve fit wrapper to manage named fit parameters and + return outcomes as ufloat objects with parameter correlation computed based on the + covariance matrix from the fitting. Result is returned as + :class:`~qiskit_experiments.curve_analysis.FitData` which is a special data container + for curve analysis. This method can perform multi-objective optimization with + multiple data series with related fit models. + + Args: + func: A fit function that can consist of multiple data series. + xdata: Numpy array representing X values. + ydata: Numpy array representing Y values. + sigma: Numpy array representing standard error of Y values. + p0: Dictionary of initial guesses for given fit function. + bounds: Dictionary of parameter boundary for given fit function. + **kwargs: Solver options. Returns: - A list of fit parameter names. + Fit result. Raises: - AnalysisError: When series definitions have inconsistent multi-objective fit function. - ValueError: When fixed parameter name is not used in the fit function. + AnalysisError: When invalid fit function is provided. + AnalysisError: When number of data points is too small. + AnalysisError: When curve fitting does not converge. """ - fsigs = set() - for series_def in cls.__series__: - fsigs.add(inspect.signature(series_def.fit_func)) - if len(fsigs) > 1: + if not isinstance(func, CompositeFitFunction): raise AnalysisError( - "Fit functions specified in the series definition have " - "different function signature. They should receive " - "the same parameter set for multi-objective function fit." + "CurveAnalysis subclass requires CompositeFitFunction instance to perform fitting. " + "Standard callback function is not acceptable due to missing signature metadata." ) - # remove the first function argument. this is usually x, i.e. not a fit parameter. - fit_params = list(list(fsigs)[0].parameters.keys())[1:] + lower = [bounds[p][0] for p in func.signature] + upper = [bounds[p][1] for p in func.signature] + scipy_bounds = (lower, upper) + scipy_p0 = list(p0.values()) - # remove fixed parameters - if cls.__fixed_parameters__ is not None: - for fixed_param in cls.__fixed_parameters__: - try: - fit_params.remove(fixed_param) - except ValueError as ex: - raise AnalysisError( - f"Defined fixed parameter {fixed_param} is not a fit function argument." - "Update series definition to ensure the parameter name is defined with " - f"fit functions. Currently available parameters are {fit_params}." - ) from ex + dof = len(ydata) - len(func.signature) + if dof < 1: + raise AnalysisError( + "The number of degrees of freedom of the fit data and model " + " (len(ydata) - len(p0)) is less than 1" + ) + + if np.any(np.nan_to_num(sigma) == 0): + # Sigma = 0 causes zero division error + sigma = None + else: + if "absolute_sigma" not in kwargs: + kwargs["absolute_sigma"] = True + + try: + # pylint: disable = unbalanced-tuple-unpacking + popt, pcov = opt.curve_fit( + func, + xdata, + ydata, + sigma=sigma, + p0=scipy_p0, + bounds=scipy_bounds, + **kwargs, + ) + except Exception as ex: + raise AnalysisError( + "scipy.optimize.curve_fit failed with error: {}".format(str(ex)) + ) from ex - return fit_params + # Compute outcome with errors correlation + if np.isfinite(pcov).all(): + # Keep parameter correlations in following analysis steps + fit_params = uncertainties.correlated_values(nom_values=popt, covariance_mat=pcov) + else: + # Ignore correlations, add standard error if finite. + fit_params = [ + uncertainties.ufloat(nominal_value=n, std_dev=s if np.isfinite(s) else np.nan) + for n, s in zip(popt, np.sqrt(np.diag(pcov))) + ] + + # Calculate the reduced chi-squared for fit + yfits = func(xdata, *popt) + residues = (yfits - ydata) ** 2 + if sigma is not None: + residues = residues / (sigma**2) + reduced_chisq = np.sum(residues) / dof + + # Compute data range for fit + xdata_range = np.min(xdata), np.max(xdata) + ydata_range = np.min(ydata), np.max(ydata) + + fit_model_descriptions = func.metadata.get("models", []) + if all(desc for desc in fit_model_descriptions): + fit_model_repr = ",".join(fit_model_descriptions) + else: + fit_model_repr = "not defined" + + return FitData( + popt=list(fit_params), + popt_keys=func.signature, + pcov=pcov, + reduced_chisq=reduced_chisq, + dof=dof, + x_range=xdata_range, + y_range=ydata_range, + fit_model=fit_model_repr, + group=func.group, + ) + + @property + def composite_funcs(self) -> Iterator[CompositeFitFunction]: + """Return parsed composite fit function for this analysis instance.""" + for fit_func in self._composite_funcs: + # Return copy of the composite fit function + # Note that this is a statefull class attribute, which can be modified during the fit. + # This may cause unexpected behavior in multithread execution, i.e. composite analysis. + yield fit_func.copy() + + @property + def fit_params(self) -> List[str]: + """Return parameters of this curve analysis.""" + return self._fit_params.copy() @classmethod def _default_options(cls) -> Options: """Return default analysis options. Analysis Options: - curve_fitter (Callable): A callback function to perform fitting with formatted data. - See :func:`~qiskit_experiments.analysis.multi_curve_fit` for example. data_processor (Callable): A callback function to format experiment data. This can be a :class:`~qiskit_experiments.data_processing.DataProcessor` instance that defines the `self.__call__` method. @@ -342,7 +330,6 @@ def _default_options(cls) -> Options: """ options = super()._default_options() - options.curve_fitter = multi_curve_fit options.data_processor = None options.normalization = False options.x_key = "xval" @@ -362,80 +349,30 @@ def _default_options(cls) -> Options: options.curve_fitter_options = dict() # automatically populate initial guess and boundary - fit_params = cls._fit_params() - options.p0 = {par_name: None for par_name in fit_params} - options.bounds = {par_name: None for par_name in fit_params} + options.p0 = {par_name: None for par_name in cls._fit_params} + options.bounds = {par_name: None for par_name in cls._fit_params} return options - def _generate_fit_guesses(self, user_opt: FitOptions) -> Union[FitOptions, List[FitOptions]]: - """Create algorithmic guess with analysis options and curve data. - - Subclasses can override this method. - - Subclass can access to the curve data with ``self._data()`` method. - If there are multiple series, you can get a specific series by specifying ``series_name``. - This method returns a ``CurveData`` instance, which is the `dataclass` - containing x values `.x`, y values `.y`, and sigma values `.y_err`. - - Subclasses can also access the defined analysis options with the ``self._get_option``. - For example: - - .. code-block:: - - curve_data = self._data(series_name="my_experiment1") - - if self._get_option("my_option1") == "abc": - param_a_guess = my_guess_function(curve_data.x, curve_data.y, ...) - else: - param_a_guess = ... + def set_options(self, **fields): + """Set the analysis options for :meth:`run` method. - user_opt.p0.set_if_empty(param_a=param_a_guess) - - Note that this subroutine can generate multiple fit options. - If multiple options are provided, the fitter will run multiple times, - i.e. once for each fit option. - The result with the best reduced chi-squared value is kept. - - Note that the argument ``user_opt`` is a collection of fitting options (initial guesses, - boundaries, and extra fitter options) with the user-provided guesses and boundaries. - The method :meth:`set_if_empty` sets the value of specified parameters of the fit options - dictionary only if the values of these parameters have not yet been assigned. - - .. code-block:: - - opt1 = user_opt.copy() - opt1.p0.set_if_empty(param_a=3) - - opt2 = user_opt.copy() - opt2.p0.set_if_empty(param_a=4) - - return [opt1, opt2] - - Note that you can also change fitter options (not only initial guesses and boundaries) - in each fit options with :meth:`add_extra_options` method. - This might be convenient to run fitting with multiple fit algorithms - or different fitting options. By default, this class uses `scipy.curve_fit` - as the fitter function. See Scipy API docs for more fitting option details. - See also :py:class:`qiskit_experiments.curve_analysis.curve_data.FitOptions` - for the behavior of the fit option instance. - - The final fit parameters are decided with the following procedure. - - 1. :class:`FitOptions` object is initialized with user options. - - 2. Algorithmic guess is generated here and override the default fit options object. - - 3. A list of fit options is returned. - - 4. Duplicated entries are eliminated. + Args: + fields: The fields to update the options - 5. The fitter optimizes parameters with unique fit options and outputs the chisq value. + Raises: + KeyError: When removed option ``curve_fitter`` is set. + """ + # TODO remove this in Qiskit Experiments v0.4 + if "curve_fitter" in fields: + raise KeyError( + "Option curve_fitter has been removed. Please directly override curve_fit method." + ) - 6. The best fit is selected based on the minimum chisq. + super().set_options(**fields) - Note that in this method you don't need to worry about the user provided initial guesses - and boundaries. These values are already assigned in the ``user_opts``. + def _generate_fit_guesses(self, user_opt: FitOptions) -> Union[FitOptions, List[FitOptions]]: + """Create algorithmic guess with analysis options and curve data. Args: user_opt: Fit options filled with user provided guess and bounds. @@ -449,19 +386,6 @@ def _generate_fit_guesses(self, user_opt: FitOptions) -> Union[FitOptions, List[ def _format_data(self, data: CurveData) -> CurveData: """An optional subroutine to perform data pre-processing. - Subclasses can override this method to apply pre-precessing to data values to fit. - - For example, - - - Apply smoothing to y values to deal with noisy observed values - - Remove redundant data points (outlier) - - Apply frequency filter function - - etc... - - By default, the analysis just takes average over the same x values and sort - data index by the x values in ascending order. - .. note:: The data returned by this method should have the label "fit_ready". @@ -501,8 +425,6 @@ def _format_data(self, data: CurveData) -> CurveData: def _extra_database_entry(self, fit_data: FitData) -> List[AnalysisResultData]: """Calculate new quantity from the fit result. - Subclasses can override this method to do post analysis. - Args: fit_data: Fit result. @@ -515,8 +437,6 @@ def _extra_database_entry(self, fit_data: FitData) -> List[AnalysisResultData]: def _evaluate_quality(self, fit_data: FitData) -> Union[str, None]: """Evaluate quality of the fit result. - Subclasses can override this method to do post analysis. - Args: fit_data: Fit result. @@ -694,16 +614,6 @@ def _transpile_options(self, index: int = -1) -> Dict[str, Any]: # Ignore experiment metadata or job metadata is not set or key is not found return None - def _extra_metadata(self) -> Dict[str, Any]: - """Returns extra metadata. - - Returns: - Extra metadata explicitly added by the experiment subclass. - """ - exclude = ["experiment_type", "num_qubits", "physical_qubits", "job_metadata"] - - return {k: v for k, v in self.__experiment_metadata.items() if k not in exclude} - def _data( self, series_name: Optional[str] = None, @@ -749,31 +659,6 @@ def _data( def _run_analysis( self, experiment_data: ExperimentData ) -> Tuple[List[AnalysisResultData], List["pyplot.Figure"]]: - # - # 1. Parse arguments - # - - # Update all fit functions in the series definitions if fixed parameter is defined. - # Fixed parameters should be provided by the analysis options. - if self.__fixed_parameters__: - assigned_params = {k: self.options.get(k, None) for k in self.__fixed_parameters__} - - # Check if all parameters are assigned. - if any(v is None for v in assigned_params.values()): - raise AnalysisError( - f"Unassigned fixed-value parameters for the fit " - f"function {self.__class__.__name__}." - f"All values of fixed-parameters, i.e. {self.__fixed_parameters__}, " - "must be provided by the analysis options to run this analysis." - ) - - # Override series definition with assigned fit functions. - assigned_series = [] - for series_def in self.__series__: - dict_def = dataclasses.asdict(series_def) - dict_def["fit_func"] = functools.partial(series_def.fit_func, **assigned_params) - assigned_series.append(SeriesDef(**dict_def)) - self.__series__ = assigned_series # get experiment metadata try: @@ -789,7 +674,7 @@ def _run_analysis( pass # - # 2. Setup data processor + # 1. Setup data processor # # If no data processor was provided at run-time we infer one from the job @@ -804,70 +689,91 @@ def _run_analysis( data_processor.train(data=experiment_data.data()) # - # 3. Extract curve entries from experiment data + # 2. Extract curve entries from experiment data # self._extract_curves(experiment_data=experiment_data, data_processor=data_processor) # - # 4. Run fitting + # 3. Run fitting # formatted_data = self._data(label="fit_ready") + xvals = formatted_data.x + yvals = formatted_data.y + sigma = formatted_data.y_err + index = formatted_data.data_index - # Generate algorithmic initial guesses and boundaries - default_fit_opt = FitOptions( - parameters=self._fit_params(), - default_p0=self.options.p0, - default_bounds=self.options.bounds, - **self.options.curve_fitter_options, - ) - - fit_options = self._generate_fit_guesses(default_fit_opt) - if isinstance(fit_options, FitOptions): - fit_options = [fit_options] + fixed_params = {p: self.options.get(p) for p in self.__fixed_parameters__} - # Run fit for each configuration fit_results = [] - for fit_opt in set(fit_options): - try: - fit_result = self.options.curve_fitter( - funcs=[series_def.fit_func for series_def in self.__series__], - series=formatted_data.data_index, - xdata=formatted_data.x, - ydata=formatted_data.y, - sigma=formatted_data.y_err, - **fit_opt.options, - ) - fit_results.append(fit_result) - except AnalysisError: - # Some guesses might be too far from the true parameters and may thus fail. - # We ignore initial guesses that fail and continue with the next fit candidate. - pass - - # Find best value with chi-squared value - if len(fit_results) == 0: - warnings.warn( - "All initial guesses and parameter boundaries failed to fit the data. " - "Please provide better initial guesses or fit parameter boundaries.", - UserWarning, + for fit_func in self.composite_funcs: + # Set parameter and data index to the composite fit function + if fixed_params: + fit_func.bind_parameters(**fixed_params) + + # Valid data index for this group + if len(self.__series__) > 1: + series_inds = [ + i for i, s in enumerate(self.__series__) if s.group == fit_func.group + ] + data_inds = np.full(index.size, False, dtype=bool) + for i in series_inds: + data_inds |= index == i + else: + data_inds = np.full(index.size, True, dtype=bool) + + fit_func.data_index = index[data_inds] + + # Generate algorithmic initial guesses and boundaries + default_fit_opt = FitOptions( + group=fit_func.group, + parameters=fit_func.signature, + default_p0=self.options.p0, + default_bounds=self.options.bounds, + **self.options.curve_fitter_options, ) - # at least return raw data points rather than terminating - fit_result = None - else: - fit_result = sorted(fit_results, key=lambda r: r.reduced_chisq)[0] + + fit_options = self._generate_fit_guesses(default_fit_opt) + if isinstance(fit_options, FitOptions): + fit_options = [fit_options] + + # Run fit for each configuration + temp_results = [] + for fit_opt in set(fit_options): + try: + fit_result = self.curve_fit( + func=fit_func, + xdata=xvals[data_inds], + ydata=yvals[data_inds], + sigma=sigma[data_inds], + **fit_opt.options, + ) + temp_results.append(fit_result) + except AnalysisError: + # Some guesses might be too far from the true parameters and may thus fail. + # We ignore initial guesses that fail and continue with the next fit candidate. + pass + + # Find best value with chi-squared value + if len(temp_results) == 0: + warnings.warn( + "All initial guesses and parameter boundaries failed to fit the data " + f"in the fit group {fit_func.group}. Please provide better initial guesses " + "or fit parameter boundaries.", + UserWarning, + ) + else: + best_fit_result = sorted(temp_results, key=lambda r: r.reduced_chisq)[0] + fit_results.append(best_fit_result) # - # 5. Create database entry + # 4. Create database entry # analysis_results = [] - if fit_result: + for fit_result in fit_results: + # pylint: disable=assignment-from-none quality = self._evaluate_quality(fit_data=fit_result) - fit_models = { - series_def.name: series_def.model_description or "no description" - for series_def in self.__series__ - } - # overview entry analysis_results.append( AnalysisResultData( @@ -876,10 +782,11 @@ def _run_analysis( chisq=fit_result.reduced_chisq, quality=quality, extra={ + "group": fit_result.group, "popt_keys": fit_result.popt_keys, "dof": fit_result.dof, "covariance_mat": fit_result.pcov, - "fit_models": fit_models, + "fit_models": fit_result.fit_model, **self.options.extra, }, ) @@ -910,12 +817,19 @@ def _run_analysis( value=fit_val, chisq=fit_result.reduced_chisq, quality=quality, - extra=metadata, + extra={ + "group": fit_result.group, + "fit_models": fit_result.fit_model, + **metadata, + }, ) analysis_results.append(result_entry) - # add extra database entries - analysis_results.extend(self._extra_database_entry(fit_result)) + # add extra database entries + extra_entries = self._extra_database_entry( + fit_results[0] if len(fit_results) == 1 else fit_results # for backward compatibility + ) + analysis_results.extend(extra_entries) if self.options.return_data_points: # save raw data points in the data base if option is set (default to false) @@ -938,7 +852,7 @@ def _run_analysis( analysis_results.append(raw_data_entry) # - # 6. Create figures + # 5. Create figures # if self.options.plot: fit_figure = FitResultPlotters[self.options.curve_plotter].value.draw( @@ -953,7 +867,8 @@ def _run_analysis( "xlim": self.options.xlim, "ylim": self.options.ylim, }, - fit_data=fit_result, + fit_data=fit_results, + fix_parameters=fixed_params, result_entries=analysis_results, style=self.options.style, axis=self.options.axis, diff --git a/qiskit_experiments/curve_analysis/curve_data.py b/qiskit_experiments/curve_analysis/curve_data.py index 294a74f5ac..5245aabd9b 100644 --- a/qiskit_experiments/curve_analysis/curve_data.py +++ b/qiskit_experiments/curve_analysis/curve_data.py @@ -15,6 +15,7 @@ """ import dataclasses +import inspect from typing import Any, Dict, Callable, Union, List, Tuple, Optional, Iterable import numpy as np @@ -47,6 +48,156 @@ class SeriesDef: # Index of canvas if the result figure is multi-panel canvas: Optional[int] = None + # Automatically extracted signature of the fit function + signature: List[str] = dataclasses.field(init=False) + + # Name of group. Curves in the same group are simultaneously fit. + group: Optional[str] = "default" + + def __post_init__(self): + """Implicitly parse fit function signature for fit function.""" + # Parse parameter names defiend in the fit function. + # Sicne SeriesDef doesn't explicitly define the fit parameter names, + # it should use python inspect module to investigate the fit function signature. + # Since this is relatively heavy overhead, this should be called one after instantiation. + # https://docs.python.org/3/library/inspect.html#inspect.signature + # + # Note that fit function usually takes arguments F(x, p0, p1, p2, ...) + # thus the first value should be excluded. + sig = list(inspect.signature(self.fit_func).parameters.keys())[1:] + # Note that this dataclass is frozen + object.__setattr__(self, "signature", sig) + + +class CompositeFitFunction: + """Function-like object that is generated by a curve analysis subclass. + + This is a function-like object that implements a fit model as a ``__call__`` magic method, + thus it behaves like a Python function that the SciPy curve_fit solver accepts. + Note that the fit function there only accepts variadic arguments. + + This class ties together the fit function and associated parameter names to + perform correct parameter mapping among multiple objective functions with different signatures, + in which some parameters may be excluded from the fitting when they are fixed. + """ + + def __init__( + self, + group: str, + fit_functions: [List[Callable]], + signatures: List[List[str]], + curve_inds: List[int], + fixed_parameters: Optional[List[str]] = None, + **metadata, + ): + """Create new composite function. + + Args: + group: A name of the fit group that this function belongs to. + fit_functions: List of callables that defines fit function of a single series. + signatures: List of parameter names of a single series. + curve_inds: List of index corresponding to the curve data. + fixed_parameters: List of parameter names that are fixed in the fit. + **metadata: An arbitrary dictionary with information about this fit function. + + Raises: + AnalysisError: When ``fit_functions`` and ``signatures`` don't match. + """ + if len(fit_functions) != len(signatures): + raise AnalysisError("Different numbers of fit_functions and signatures are given.") + + if fixed_parameters is None: + fixed_parameters = tuple() + + self._group = group + self._fit_functions = fit_functions + self._signatures = signatures + self._curve_inds = curve_inds + self._metadata = metadata or dict() + + # Parameters that can be overridden + self._fixed_params = {p: None for p in fixed_parameters} + self._data_index = None + + fit_args = [] + # Logic is not efficient but should keep order of parameters for backward compatibility + for signature in signatures: + for param in signature: + if param not in fit_args and param not in fixed_parameters: + fit_args.append(param) + + self._full_params = fit_args + + def __call__(self, x: np.ndarray, *params) -> np.ndarray: + """Called by the scipy fit function. + + Args: + x: Composite X values array. + *params: Variadic argument of fitting parameters. + + Returns: + Computed Y values array. + """ + kwparams = dict(zip(self._full_params, params)) + kwparams.update(self._fixed_params) + + y = np.zeros(x.size) + for i, func, sig in zip(self._curve_inds, self._fit_functions, self._signatures): + if self._data_index is not None: + inds = self._data_index == i + else: + # Use all data if data index is not set + inds = np.full(x.size, True, dtype=bool) + + y[inds] = func(x[inds], **{p: kwparams[p] for p in sig}) + + return y + + def bind_parameters(self, **kwparams): + """Set fixed parameters.""" + bind_dict = {k: kwparams[k] for k in self._fixed_params.keys() if k in kwparams} + self._fixed_params.update(bind_dict) + + @property + def data_index(self) -> np.ndarray: + """Return current data index mapping.""" + return self._data_index + + @data_index.setter + def data_index(self, new_indices: np.ndarray): + """Set data index mapping for current fit data.""" + self._data_index = new_indices + + @property + def signature(self) -> List[str]: + """Return signature of the composite fit function.""" + return self._full_params + + @property + def metadata(self) -> Dict[str, Any]: + """Return metadata of this fit function.""" + return self._metadata + + @property + def group(self) -> str: + """Return a group that this function belongs to.""" + return self._group + + def copy(self): + """Return copy of this function. Assigned parameters and indices are refleshed.""" + return CompositeFitFunction( + group=self.group, + fit_functions=self._fit_functions, + signatures=self._signatures, + curve_inds=self._curve_inds, + fixed_parameters=list(self._fixed_params.keys()), + **self.metadata.copy(), + ) + + def __repr__(self): + sigrepr = ", ".join(self.signature) + return f"{self.__class__.__name__}(x, {sigrepr}; group={self.group})" + @dataclasses.dataclass(frozen=True) class CurveData: @@ -99,6 +250,12 @@ class FitData: # Y data range y_range: Tuple[float, float] + # String representation of fit model + fit_model: str = "not defined" + + # String representation of the group that this fit belongs to. + group: str = "default" + def fitval(self, key: str) -> uncertainties.UFloat: """A helper method to get fit value object from parameter key name. @@ -278,11 +435,14 @@ class FitOptions: def __init__( self, + group: str, parameters: List[str], default_p0: Optional[Union[Iterable[float], Dict[str, float]]] = None, default_bounds: Optional[Union[Iterable[Tuple], Dict[str, Tuple]]] = None, **extra, ): + self.group = group + # These are private members so that user cannot directly override values # without implicitly implemented validation logic. No setter will be provided. self.__p0 = InitialGuesses(parameters, default_p0) @@ -309,6 +469,7 @@ def add_extra_options(self, **kwargs): def copy(self): """Create copy of this option.""" return FitOptions( + group=self.group, parameters=list(self.__p0.keys()), default_p0=dict(self.__p0), default_bounds=dict(self.__bounds), diff --git a/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py b/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py index 537c3ed759..ebb836d674 100644 --- a/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py +++ b/qiskit_experiments/curve_analysis/visualization/fit_result_plotters.py @@ -22,6 +22,7 @@ """ from collections import defaultdict +import functools from typing import List, Dict, Optional import uncertainties @@ -46,7 +47,8 @@ def draw( raw_samples: List[CurveData], fit_samples: List[CurveData], tick_labels: Dict[str, str], - fit_data: FitData, + fit_data: List[FitData], + fix_parameters: Dict[str, float], result_entries: List[AnalysisResultData], style: Optional[PlotterStyle] = None, axis: Optional["matplotlib.axes.Axes"] = None, @@ -60,6 +62,7 @@ def draw( tick_labels: Dictionary of axis label information. Axis units and label for x and y value should be explained. fit_data: fit data generated by the analysis. + fix_parameters: parameter not being in fitting. result_entries: List of analysis result data entries. style: Optional. A configuration object to modify the appearance of the figure. axis: Optional. A matplotlib Axis object. @@ -84,6 +87,7 @@ def draw( raw_sample=raw_samp, fit_sample=fit_samp, fit_data=fit_data, + fix_parameters=fix_parameters, style=style, ) @@ -116,11 +120,20 @@ def draw( # write analysis report if fit_data: report_str = write_fit_report(result_entries) - report_str += r"Fit $\chi^2$ = " + f"{fit_data.reduced_chisq: .4g}" + + if len(fit_data) > 2: + chisq_strs = [] + for fit_datum in fit_data: + chisq_strs.append( + r"Fit $\chi^2$ = " + f"{fit_datum.reduced_chisq: .4g} ({fit_datum.group})" + ) + report_str += "\n".join(chisq_strs) + else: + report_str += r"Fit $\chi^2$ = " + f"{fit_data[0].reduced_chisq: .4g}" report_handler = axis.text( *style.fit_report_rpos, - report_str, + s=report_str, ha="center", va="top", size=style.fit_report_text_size, @@ -146,7 +159,8 @@ def draw( raw_samples: List[CurveData], fit_samples: List[CurveData], tick_labels: Dict[str, str], - fit_data: FitData, + fit_data: List[FitData], + fix_parameters: Dict[str, float], result_entries: List[AnalysisResultData], style: Optional[PlotterStyle] = None, axis: Optional["matplotlib.axes.Axes"] = None, @@ -160,6 +174,7 @@ def draw( tick_labels: Dictionary of axis label information. Axis units and label for x and y value should be explained. fit_data: fit data generated by the analysis. + fix_parameters: parameter not being in fitting. result_entries: List of analysis result data entries. style: Optional. A configuration object to modify the appearance of the figure. axis: Optional. A matplotlib Axis object. @@ -222,6 +237,7 @@ def draw( raw_sample=raw_samples[curve_ind], fit_sample=fit_samples[curve_ind], fit_data=fit_data, + fix_parameters=fix_parameters, style=style, ) @@ -268,11 +284,20 @@ def draw( # write analysis report if fit_data: report_str = write_fit_report(result_entries) - report_str += r"Fit $\chi^2$ = " + f"{fit_data.reduced_chisq: .4g}" + + if len(fit_data) > 2: + chisq_strs = [] + for fit_datum in fit_data: + chisq_strs.append( + r"Fit $\chi^2$ = " + f"{fit_datum.reduced_chisq: .4g} ({fit_datum.group})" + ) + report_str += "\n".join(chisq_strs) + else: + report_str += r"Fit $\chi^2$ = " + f"{fit_data[0].reduced_chisq: .4g}" report_handler = axis.text( *style.fit_report_rpos, - report_str, + s=report_str, ha="center", va="top", size=style.fit_report_text_size, @@ -293,7 +318,8 @@ def draw_single_curve_mpl( series_def: SeriesDef, raw_sample: CurveData, fit_sample: CurveData, - fit_data: FitData, + fit_data: List[FitData], + fix_parameters: Dict[str, float], style: PlotterStyle, ): """A function that draws a single curve on the given plotter canvas. @@ -304,6 +330,7 @@ def draw_single_curve_mpl( raw_sample: Raw sample data. fit_sample: Formatted sample data. fit_data: Fitting parameter collection. + fix_parameters: Parameters not being in the fitting. style: Style sheet for plotting. """ @@ -330,15 +357,21 @@ def draw_single_curve_mpl( ) # plot fit curve - if fit_data: - plot_curve_fit( - func=series_def.fit_func, - result=fit_data, - ax=axis, - color=series_def.plot_color, - zorder=2, - fit_uncertainty=style.plot_sigma, - ) + for fit_datum in fit_data: + if series_def.group == fit_datum.group: + if fix_parameters: + fit_func = functools.partial(series_def.fit_func, **fix_parameters) + else: + fit_func = series_def.fit_func + + plot_curve_fit( + func=fit_func, + result=fit_datum, + ax=axis, + color=series_def.plot_color, + zorder=2, + fit_uncertainty=style.plot_sigma, + ) def write_fit_report(result_entries: List[AnalysisResultData]) -> str: diff --git a/test/curve_analysis/test_curve_analysis_base_class.py b/test/curve_analysis/test_curve_analysis_base_class.py new file mode 100644 index 0000000000..b005d37675 --- /dev/null +++ b/test/curve_analysis/test_curve_analysis_base_class.py @@ -0,0 +1,873 @@ +# This code is part of Qiskit. +# +# (C) Copyright IBM 2021. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +# pylint: disable=invalid-name, missing-class-docstring, unsubscriptable-object + +"""Test curve fitting base class.""" +from test.base import QiskitExperimentsTestCase + +import numpy as np +from uncertainties import unumpy as unp + +from qiskit_experiments.curve_analysis import CurveAnalysis, fit_function +from qiskit_experiments.curve_analysis.curve_data import ( + SeriesDef, + CompositeFitFunction, + FitData, + ParameterRepr, +) +from qiskit_experiments.data_processing import DataProcessor, Probability +from qiskit_experiments.exceptions import AnalysisError +from qiskit_experiments.framework import ExperimentData, AnalysisResultData, CompositeAnalysis + + +class TestCompositeFunction(QiskitExperimentsTestCase): + """Test behavior of CompositeFunction which is a core object of CurveAnalysis. + + This is new fit function wrapper introduced in Qiskit Experiments 0.3. + This function-like object should manage parameter assignment and mapping to + manage multiple sub functions (curves) for multi-objective optimization. + """ + + def test_single_function(self): + """A simple testcase for having only single fit function.""" + + def child_function(x, p0, p1): + return p0 * x + p1 + + function = CompositeFitFunction( + group="default", + fit_functions=[child_function], + signatures=[["p0", "p1"]], + curve_inds=[0], + model="p0 x + p1", + ) + + self.assertListEqual(function.signature, ["p0", "p1"]) + self.assertEqual(function.group, "default") + self.assertEqual(function.metadata["model"], "p0 x + p1") + self.assertEqual(repr(function), "CompositeFitFunction(x, p0, p1; group=default)") + + x = np.linspace(0, 1, 10) + p0 = 1 + p1 = 2 + ref_y = child_function(x, p0, p1) + test_y = function(x, p0, p1) + + np.testing.assert_array_equal(ref_y, test_y) + + def test_single_function_parameter_fixed(self): + """Test when some parameters are fixed.""" + + def child_function(x, p0, p1): + return p0 * x + p1 + + function = CompositeFitFunction( + group="default", + fit_functions=[child_function], + signatures=[["p0", "p1"]], + curve_inds=[0], + fixed_parameters=["p0"], + ) + + self.assertListEqual(function.signature, ["p1"]) + + x = np.linspace(0, 1, 10) + p0 = 1 + p1 = 2 + ref_y = child_function(x, p0, p1) + + # Need to call parameter binding here + function.bind_parameters(p0=p0) + test_y = function(x, p1) + + np.testing.assert_array_equal(ref_y, test_y) + + def test_multiple_functions(self): + """Test with multiple functions.""" + + def child_function1(x, p0, p1): + return p0 * x + p1 + + def child_function2(x, p0, p2): + return p0 * x - p2 + + function = CompositeFitFunction( + group="default", + fit_functions=[child_function1, child_function2], + signatures=[["p0", "p1"], ["p0", "p2"]], + curve_inds=[0, 1], + ) + + self.assertListEqual(function.signature, ["p0", "p1", "p2"]) + + x1 = np.linspace(0, 1, 10) + x2 = np.linspace(2, 3, 10) + p0 = 1 + p1 = 2 + p2 = 3 + ref_y1 = child_function1(x1, p0, p1) + ref_y2 = child_function2(x2, p0, p2) + + data_index = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + ref_y = np.zeros(ref_y1.size + ref_y2.size) + ref_y[data_index == 0] = ref_y1 + ref_y[data_index == 1] = ref_y2 + + # Need to set data index + function.data_index = data_index + test_y = function(np.r_[x1, x2], p0, p1, p2) + + np.testing.assert_array_equal(ref_y, test_y) + + def test_multiple_functions_with_fixed_parameter(self): + """Test with multiple functions while some parameters are fixed.""" + + def child_function1(x, p0, p1): + return p0 * x + p1 + + def child_function2(x, p0, p2): + return p0 * x - p2 + + function = CompositeFitFunction( + group="default", + fit_functions=[child_function1, child_function2], + signatures=[["p0", "p1"], ["p0", "p2"]], + curve_inds=[0, 1], + fixed_parameters=["p1"], + ) + + self.assertListEqual(function.signature, ["p0", "p2"]) + + x1 = np.linspace(0, 1, 10) + x2 = np.linspace(2, 3, 10) + p0 = 1 + p1 = 2 + p2 = 3 + ref_y1 = child_function1(x1, p0, p1) + ref_y2 = child_function2(x2, p0, p2) + + data_index = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + ref_y = np.zeros(ref_y1.size + ref_y2.size) + ref_y[data_index == 0] = ref_y1 + ref_y[data_index == 1] = ref_y2 + + # Need to set data index and fixed parameter here + function.data_index = data_index + function.bind_parameters(p1=p1) + test_y = function(np.r_[x1, x2], p0, p2) + + np.testing.assert_array_equal(ref_y, test_y) + + +class TestCurveFit(QiskitExperimentsTestCase): + """Test core fitting functionality by bypassing analysis framework. + + CurveAnalysis can provide fit function and fit algorithm via its + instance property and static method, we can only unittest fitting part. + This test suite validate fitting function with various situation including + single curve, mutiple curves, parameter fixsing, etc... + """ + + def test_single_function(self): + """Test case for single curve entry.""" + + def child_function(x, p0, p1): + return p0 * x + p1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.linspace(0, 1, 10) + p0 = 1 + p1 = 2 + fake_outcome = child_function(x, p0, p1) + + fit_func = next(instance.composite_funcs) + result = instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"p0": 0.9, "p1": 2.1}, + bounds={"p0": (0, 2), "p1": (1, 3)}, + ) + self.assertIsInstance(result, FitData) + + self.assertEqual(result.fit_model, "not defined") + self.assertEqual(result.popt_keys, ["p0", "p1"]) + self.assertEqual(result.dof, 8) + self.assertEqual(result.group, "default") + np.testing.assert_array_almost_equal(unp.nominal_values(result.popt), [p0, p1]) + + # test if values are operable + p0_val = result.fitval("p0") + p1_val = result.fitval("p1") + + new_quantity = p0_val + p1_val + self.assertAlmostEqual(new_quantity.s, np.sqrt(p0_val.s**2 + p1_val.s**2)) + + def test_multiple_functions(self): + """Test case for multiple curve entries.""" + + def child_function1(x, p0, p1): + return p0 * x + p1 + + def child_function2(x, p0, p2): + return p0 * x - p2 + + class MyCurveFit(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=child_function1, + model_description="p0 x + p1", + ), + SeriesDef( + fit_func=child_function2, + model_description="p0 x - p2", + ), + ] + + instance = MyCurveFit() + + x1 = np.linspace(0, 1, 10) + x2 = np.linspace(2, 3, 10) + p0 = 1 + p1 = 2 + p2 = 3 + ref_y1 = child_function1(x1, p0, p1) + ref_y2 = child_function2(x2, p0, p2) + + data_index = np.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + fake_outcome = np.zeros(ref_y1.size + ref_y2.size) + fake_outcome[data_index == 0] = ref_y1 + fake_outcome[data_index == 1] = ref_y2 + + fit_func = next(instance.composite_funcs) + fit_func.data_index = data_index + + result = instance.curve_fit( + func=fit_func, + xdata=np.r_[x1, x2], + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"p0": 0.9, "p1": 2.1, "p2": 2.9}, + bounds={"p0": (0, 2), "p1": (1, 3), "p2": (2, 4)}, + ) + + self.assertEqual(result.fit_model, "p0 x + p1,p0 x - p2") + self.assertEqual(result.popt_keys, ["p0", "p1", "p2"]) + np.testing.assert_array_almost_equal(unp.nominal_values(result.popt), [p0, p1, p2]) + + def test_assert_dof_error(self): + """Test raise an DOF error when input data size is too small.""" + + def child_function(x, p0, p1): + return p0 * x + p1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.array([2]) # DOF = 0 + p0 = 1 + p1 = 2 + fake_outcome = child_function(x, p0, p1) + + fit_func = next(instance.composite_funcs) + with self.assertRaises(AnalysisError): + instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"p0": 0.9, "p1": 2.1}, + bounds={"p0": (0, 2), "p1": (1, 3)}, + ) + + def test_assert_invalid_fit(self): + """Test scipy solver error is converted into AnalysisError.""" + + def child_function(x, p0, p1): + return p0 * x + p1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.linspace(0, 1, 10) + p0 = 1 + p1 = 2 + fake_outcome = child_function(x, p0, p1) + + fit_func = next(instance.composite_funcs) + with self.assertRaises(AnalysisError): + instance.curve_fit( + func=fit_func, + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"p0": 0, "p1": 2.1}, + bounds={"p0": (-1, 0), "p1": (-1, 0)}, # impossible to fit within this range + ) + + def test_assert_fit_with_bare_calback(self): + """Test raise error when normal callback is set.""" + + def child_function(x, p0, p1): + return p0 * x + p1 + + class MyCurveFit(CurveAnalysis): + __series__ = [SeriesDef(fit_func=child_function)] + + instance = MyCurveFit() + + x = np.linspace(0, 1, 10) + p0 = 1 + p1 = 2 + fake_outcome = child_function(x, p0, p1) + + with self.assertRaises(AnalysisError): + instance.curve_fit( + func=child_function, # cannot manage parameter mapping and metadata + xdata=x, + ydata=fake_outcome, + sigma=np.zeros_like(fake_outcome), + p0={"p0": 0, "p1": 2.1}, + bounds={"p0": (-1, 0), "p1": (-1, 0)}, # impossible to fit within this range + ) + + def test_assert_invalid_fixed_parameter(self): + """Test we cannot create invalid analysis instance with wrong fixed value name.""" + with self.assertRaises(AnalysisError): + # pylint: disable=unused-variable + class InvalidAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, p0: x + p0, + ) + ] + __fixed_parameters__ = ["not_existing"] + + +class CurveAnalysisTestCase(QiskitExperimentsTestCase): + """A baseclass for CurveAnalysis unittest.""" + + seeds = 123 + + @classmethod + def single_sampler(cls, xvalues, yvalues, shots=10000, **metadata): + """A helper function to generate experiment data.""" + rng = np.random.default_rng(seed=cls.seeds) + counts = rng.binomial(shots, yvalues) + data = [ + { + "counts": {"0": shots - c, "1": c}, + "metadata": {"xval": xi, "qubits": (0,), **metadata}, + } + for xi, c in zip(xvalues, counts) + ] + + return data + + @classmethod + def parallel_sampler(cls, xvalues, yvalues1, yvalues2, shots=10000): + """A helper function to generate fake parallel experiment data.""" + rng = np.random.default_rng(seed=cls.seeds) + + data = [] + for xi, p1, p2 in zip(xvalues, yvalues1, yvalues2): + cs = rng.multinomial( + shots, [(1 - p1) * (1 - p2), p1 * (1 - p2), (1 - p1) * p2, p1 * p2] + ) + circ_data = { + "counts": {"00": cs[0], "01": cs[1], "10": cs[2], "11": cs[3]}, + "metadata": { + "composite_index": [0, 1], + "composite_metadata": [{"xval": xi}, {"xval": xi}], + "composite_qubits": [[0], [1]], + "composite_clbits": [[0], [1]], + }, + } + data.append(circ_data) + + return data + + +class TestCurveAnalysisUnit(CurveAnalysisTestCase): + """Unittest of CurveAnalysis functionality.""" + + def setUp(self): + super().setUp() + + # Description of test setting + # + # - This model contains three curves, namely, curve1, curve2, curve3 + # - Each curve can be represented by the same function + # - Parameter amp and baseline are shared among all curves + # - Each curve has unique lamb + # - In total 5 parameters in the fit, namely, p0, p1, p2, p3 + # + class MyAnalysis(CurveAnalysis): + """Test analysis""" + + # Note that series def function can take different argument now. + # The signature of composite function is generated on the fly. + __series__ = [ + SeriesDef( + name="curve1", + fit_func=lambda x, p0, p1, p4: fit_function.exponential_decay( + x, amp=p0, lamb=p1, baseline=p4 + ), + filter_kwargs={"type": 1, "valid": True}, + model_description=r"p_0 * \exp(p_1 x) + p4", + ), + SeriesDef( + name="curve2", + fit_func=lambda x, p0, p2, p4: fit_function.exponential_decay( + x, amp=p0, lamb=p2, baseline=p4 + ), + filter_kwargs={"type": 2, "valid": True}, + model_description=r"p_0 * \exp(p_2 x) + p4", + ), + SeriesDef( + name="curve3", + fit_func=lambda x, p0, p3, p4: fit_function.exponential_decay( + x, amp=p0, lamb=p3, baseline=p4 + ), + filter_kwargs={"type": 3, "valid": True}, + model_description=r"p_0 * \exp(p_3 x) + p4", + ), + ] + + self.analysis_cls = MyAnalysis + + def test_parsed_fit_params(self): + """Test parsed fit params.""" + instance = self.analysis_cls() + + # Note that parameters are ordered according to the following rule. + # 1. Take series[0] and add its fittting parameters + # 2. Take next series and its fitting parameters if not exist in the list + # 3. Repeat until the last series + self.assertListEqual(instance.fit_params, ["p0", "p1", "p4", "p2", "p3"]) + + def test_parsed_init_guess(self): + """Test parsed initial guess and boundaries.""" + instance = self.analysis_cls() + + ref = {"p0": None, "p1": None, "p2": None, "p3": None, "p4": None} + self.assertDictEqual(instance.options.p0, ref) + self.assertDictEqual(instance.options.bounds, ref) + + def test_data_extraction(self): + """Test data extraction method.""" + shots = 5000000 # something big for data generation unittest + + instance = self.analysis_cls() + instance.set_options(x_key="xval") + + def data_processor(datum): + count = datum["counts"].get("1", 0) + pmean = count / shots + return pmean, pmean * (1 - pmean) / shots + + x = np.linspace(1.0, 5.0, 10) + y1 = fit_function.exponential_decay(x, amp=1.0) + y2 = fit_function.exponential_decay(x, amp=0.5) + + test_data_y1 = self.single_sampler(xvalues=x, yvalues=y1, shots=shots, type=1, valid=True) + test_data_y2 = self.single_sampler(xvalues=x, yvalues=y2, shots=shots, type=2, valid=False) + + expdata = ExperimentData() + expdata.add_data(test_data_y1 + test_data_y2) + + instance._extract_curves(experiment_data=expdata, data_processor=data_processor) + raw_data = instance._data(label="raw_data") + + # check x value + xdata = raw_data.x + ref_x = np.r_[x, x] + np.testing.assert_array_equal(xdata, ref_x) + + # check y value + ydata = raw_data.y + ref_y = np.r_[y1, y2] + np.testing.assert_array_almost_equal(ydata, ref_y, decimal=3) + + # check sigma + sigma = raw_data.y_err + ref_sigma = np.r_[y1 * (1 - y1) / shots, y2 * (1 - y2) / shots] + np.testing.assert_array_almost_equal(sigma, ref_sigma, decimal=3) + + # check data index + index = raw_data.data_index + ref_index = np.r_[np.full(10, 0), np.full(10, -1)] # second value doesn't match; -1 + np.testing.assert_array_equal(index, ref_index) + + def test_get_subset(self): + """Test that get subset data from full data array.""" + instance = self.analysis_cls() + instance.set_options(x_key="xval") + + fake_data = [ + {"data": 1, "metadata": {"xval": 1, "type": 1, "valid": True}}, + {"data": 2, "metadata": {"xval": 2, "type": 2, "valid": True}}, + {"data": 3, "metadata": {"xval": 3, "type": 1, "valid": True}}, + {"data": 4, "metadata": {"xval": 4, "type": 3, "valid": True}}, + {"data": 5, "metadata": {"xval": 5, "type": 3, "valid": True}}, + {"data": 6, "metadata": {"xval": 6, "type": 4, "valid": True}}, # this if fake + ] + expdata = ExperimentData() + expdata.add_data(fake_data) + + def data_processor(datum): + return datum["data"], datum["data"] * 2 + + instance._extract_curves(expdata, data_processor=data_processor) + + filt_data = instance._data(series_name="curve1") + np.testing.assert_array_equal(filt_data.x, np.asarray([1, 3], dtype=float)) + np.testing.assert_array_equal(filt_data.y, np.asarray([1, 3], dtype=float)) + np.testing.assert_array_equal(filt_data.y_err, np.asarray([2, 6], dtype=float)) + + filt_data = instance._data(series_name="curve2") + np.testing.assert_array_equal(filt_data.x, np.asarray([2], dtype=float)) + np.testing.assert_array_equal(filt_data.y, np.asarray([2], dtype=float)) + np.testing.assert_array_equal(filt_data.y_err, np.asarray([4], dtype=float)) + + filt_data = instance._data(series_name="curve3") + np.testing.assert_array_equal(filt_data.x, np.asarray([4, 5], dtype=float)) + np.testing.assert_array_equal(filt_data.y, np.asarray([4, 5], dtype=float)) + np.testing.assert_array_equal(filt_data.y_err, np.asarray([8, 10], dtype=float)) + + +class TestCurveAnalysisIntegration(CurveAnalysisTestCase): + """Unittest of CurveAnalysis full functionality. + + Because parameter mapping and fitting feature is already tested in + TestCompositeFunction and TestCurveFit, + this test suite focuses on the entire workflow of curve analysis. + """ + + def test_single_function(self): + """Simple test case with a single curve.""" + p0 = 0.5 + p1 = 3 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues = fit_function.exponential_decay(xvalues, amp=p0, lamb=p1) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1) + ) + ] + + test_data = self.single_sampler(xvalues, yvalues) + expdata = ExperimentData() + expdata.add_data(test_data) + + init_guess = {"p0": 0.4, "p1": 2.9} + instance = MyAnalysis() + instance.set_options( + x_key="xval", + p0=init_guess, + result_parameters=[ParameterRepr("p0", "amp"), ParameterRepr("p1", "lamb")], + data_processor=data_processor, + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + all_parameters = run_expdata.analysis_results("@Parameters_MyAnalysis") + p0_analyzed = run_expdata.analysis_results("amp") + p1_analyzed = run_expdata.analysis_results("lamb") + + np.testing.assert_array_almost_equal(all_parameters.value, [p0, p1], decimal=2) + self.assertAlmostEqual(p0_analyzed.value.n, p0, delta=0.05) + self.assertAlmostEqual(p1_analyzed.value.n, p1, delta=0.05) + + def test_extra_entry(self): + """Simple test case analysis add new entry.""" + p0 = 0.5 + p1 = 3 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues = fit_function.exponential_decay(xvalues, amp=p0, lamb=p1) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1) + ) + ] + + def _extra_database_entry(self, fit_data): + return [ + AnalysisResultData( + name="new_value", + value=fit_data.fitval("p0") + fit_data.fitval("p1"), + ) + ] + + test_data = self.single_sampler(xvalues, yvalues) + expdata = ExperimentData() + expdata.add_data(test_data) + + init_guess = {"p0": 0.4, "p1": 2.9} + instance = MyAnalysis() + instance.set_options( + x_key="xval", + p0=init_guess, + data_processor=data_processor, + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + new_entry = run_expdata.analysis_results("new_value") + + self.assertAlmostEqual(new_entry.value.n, p0 + p1, delta=0.05) + + def test_evaluate_quality(self): + """Simple test case evaluating quality.""" + p0 = 0.5 + p1 = 3 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues = fit_function.exponential_decay(xvalues, amp=p0, lamb=p1) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1) + ) + ] + + def _evaluate_quality(self, fit_data): + return "evaluated!" + + test_data = self.single_sampler(xvalues, yvalues) + expdata = ExperimentData() + expdata.add_data(test_data) + + init_guess = {"p0": 0.4, "p1": 2.9} + instance = MyAnalysis() + instance.set_options( + x_key="xval", + p0=init_guess, + data_processor=data_processor, + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + entry = run_expdata.analysis_results(0) + self.assertEqual(entry.quality, "evaluated!") + + def test_multi_group_fit(self): + """Test case for multi group fit. + + Two curves may have the same fit parameter, but they are independently fit. + Thus fit outcome can have different values. + """ + p0a = 0.5 + p1a = 3 + p0b = 0.6 + p1b = 2 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues_a = fit_function.exponential_decay(xvalues, amp=p0a, lamb=p1a) + yvalues_b = fit_function.exponential_decay(xvalues, amp=p0b, lamb=p1b) + + test_data_a = self.single_sampler(xvalues, yvalues_a, tag="a") + test_data_b = self.single_sampler(xvalues, yvalues_b, tag="b") + expdata = ExperimentData() + expdata.add_data(test_data_a + test_data_b) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + # They have the same fit function signature, but they are separately fit + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1), + filter_kwargs={"tag": "a"}, + group="a", + ), + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1), + filter_kwargs={"tag": "b"}, + group="b", + ), + ] + + def _generate_fit_guesses(self, user_opt): + if user_opt.group == "a": + user_opt.p0.set_if_empty(p0=0.5, p1=3) + else: + user_opt.p0.set_if_empty(p0=0.6, p1=2) + + return user_opt + + instance = MyAnalysis() + instance.set_options( + x_key="xval", + data_processor=data_processor, + result_parameters=["p0", "p1"], + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + p0s = run_expdata.analysis_results("p0") + p1s = run_expdata.analysis_results("p1") + + self.assertEqual(len(p0s), 2) + self.assertEqual(len(p1s), 2) + + self.assertAlmostEqual(p0s[0].value.n, p0a, delta=0.05) + self.assertAlmostEqual(p0s[0].extra["group"], "a", delta=0.05) + + self.assertAlmostEqual(p0s[1].value.n, p0b, delta=0.05) + self.assertAlmostEqual(p0s[1].extra["group"], "b", delta=0.05) + + def test_multi_group_fit_one_fail(self): + """Test case when one of two fits fail. + + Since two fits are independent, other can continue. + """ + p0a = 0.5 + p1a = 3 + p0b = 0.6 + p1b = 2 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues_a = fit_function.exponential_decay(xvalues, amp=p0a, lamb=p1a) + yvalues_b = fit_function.exponential_decay(xvalues, amp=p0b, lamb=p1b) + + test_data_a = self.single_sampler(xvalues, yvalues_a, tag="a") + test_data_b = self.single_sampler(xvalues, yvalues_b, tag="b") + expdata = ExperimentData() + expdata.add_data(test_data_a + test_data_b) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + # They have the same fit function signature, but they are separately fit + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1), + filter_kwargs={"tag": "a"}, + group="a", + ), + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1), + filter_kwargs={"tag": "b"}, + group="b", + ), + ] + + def _generate_fit_guesses(self, user_opt): + if user_opt.group == "a": + user_opt.p0.set_if_empty(p0=0.5, p1=3) + else: + user_opt.bounds.set_if_empty( + p0=(-1, 0), p1=(-1, 0) + ) # invalid range for group b + + return user_opt + + instance = MyAnalysis() + instance.set_options( + x_key="xval", + data_processor=data_processor, + result_parameters=["p0", "p1"], + plot=False, + ) + + run_expdata = instance.run(expdata, replace_results=False) + + p0 = run_expdata.analysis_results("p0") + self.assertAlmostEqual(p0.value.n, p0a, delta=0.05) + self.assertEqual(p0.extra["group"], "a") + + def test_curve_analysis_multi_thread(self): + """Test case for composite analyis. + + Check if analysis works properly when two instances are simultaneously operated + in the multiple threads. Note that composite function is a class attribute + thus it should not be modified during the fit. + """ + p00 = 0.5 + p10 = 3 + + p01 = 0.5 + p11 = 4 + + data_processor = DataProcessor(input_key="counts", data_actions=[Probability("1")]) + xvalues = np.linspace(0, 1, 100) + yvalues_a = fit_function.exponential_decay(xvalues, amp=p00, lamb=p10) + yvalues_b = fit_function.exponential_decay(xvalues, amp=p01, lamb=p11) + + comp_data = self.parallel_sampler(xvalues, yvalues_a, yvalues_b) + + subdata1 = ExperimentData() + subdata2 = ExperimentData() + + composite_expdata = ExperimentData() + composite_expdata.metadata["component_child_index"] = [0, 1] + composite_expdata.add_child_data(subdata1) + composite_expdata.add_child_data(subdata2) + composite_expdata.add_data(comp_data) + + class MyAnalysis(CurveAnalysis): + __series__ = [ + SeriesDef( + fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1) + ) + ] + __fixed_parameters__ = ["p1"] + + @classmethod + def _default_options(cls): + options = super()._default_options() + options.data_processor = data_processor + options.plot = False + options.result_parameters = ["p0"] + options.p0 = {"p0": 0.49} + options.bounds = {"p0": (0.4, 0.6)} + options.p1 = None + + return options + + # Override CompositeFitFunction with different fixed parameters + sub_analysis1 = MyAnalysis() + sub_analysis1.set_options(p1=p10) + sub_analysis2 = MyAnalysis() + sub_analysis2.set_options(p1=p11) + + instance = CompositeAnalysis([sub_analysis1, sub_analysis2]) + run_expdata = instance.run(composite_expdata, replace_results=False).block_for_results() + + p0_sub1 = run_expdata.child_data(0).analysis_results("p0") + self.assertAlmostEqual(p0_sub1.value.n, p00, delta=0.05) + + p0_sub2 = run_expdata.child_data(1).analysis_results("p0") + self.assertAlmostEqual(p0_sub2.value.n, p01, delta=0.05) diff --git a/test/curve_analysis/test_curve_fit.py b/test/curve_analysis/test_curve_fit.py deleted file mode 100644 index 2dddda850e..0000000000 --- a/test/curve_analysis/test_curve_fit.py +++ /dev/null @@ -1,736 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2021. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -# pylint: disable=invalid-name - -"""Test curve fitting base class.""" -from test.base import QiskitExperimentsTestCase -from test.fake_experiment import FakeExperiment -from typing import List - -import numpy as np -from qiskit.qobj.utils import MeasLevel -from uncertainties import correlated_values - -from qiskit_experiments.curve_analysis import CurveAnalysis, fit_function -from qiskit_experiments.curve_analysis.curve_data import ( - SeriesDef, - FitData, - ParameterRepr, - FitOptions, -) -from qiskit_experiments.curve_analysis.data_processing import probability -from qiskit_experiments.exceptions import AnalysisError -from qiskit_experiments.framework import ExperimentData - - -def simulate_output_data(func, xvals, param_dict, **metadata): - """Generate arbitrary fit data.""" - __shots = 100000 - - expected_probs = func(xvals, **param_dict) - counts = np.asarray(expected_probs * __shots, dtype=int) - - data = [ - { - "counts": {"0": __shots - count, "1": count}, - "metadata": dict(xval=xi, qubits=(0,), experiment_type="fake_experiment", **metadata), - } - for xi, count in zip(xvals, counts) - ] - - expdata = ExperimentData(experiment=FakeExperiment()) - for datum in data: - expdata.add_data(datum) - - expdata.metadata["job_metadata"] = [{"run_options": {"meas_level": MeasLevel.CLASSIFIED}}] - - return expdata - - -def create_new_analysis(series: List[SeriesDef], fixed_params: List[str] = None) -> CurveAnalysis: - """A helper function to create a mock analysis class instance.""" - - class TestAnalysis(CurveAnalysis): - """A mock analysis class to test.""" - - __series__ = series - __fixed_parameters__ = fixed_params or list() - - return TestAnalysis() - - -class TestFitData(QiskitExperimentsTestCase): - """Unittest for fit data dataclass.""" - - def test_get_value(self): - """Get fit value from fit data object.""" - pcov = np.diag(np.ones(3)) - popt = np.asarray([1.0, 2.0, 3.0]) - fit_params = correlated_values(popt, pcov) - - data = FitData( - popt=fit_params, - popt_keys=["a", "b", "c"], - pcov=pcov, - reduced_chisq=0.0, - dof=0, - x_range=(0, 0), - y_range=(0, 0), - ) - - a_val = data.fitval("a") - self.assertEqual(a_val, fit_params[0]) - - b_val = data.fitval("b") - self.assertEqual(b_val, fit_params[1]) - - c_val = data.fitval("c") - self.assertEqual(c_val, fit_params[2]) - - -class TestCurveAnalysisUnit(QiskitExperimentsTestCase): - """Unittest for curve fit analysis.""" - - def setUp(self): - super().setUp() - self.xvalues = np.linspace(1.0, 5.0, 10) - - # Description of test setting - # - # - This model contains three curves, namely, curve1, curve2, curve3 - # - Each curve can be represented by the same function - # - Parameter amp and baseline are shared among all curves - # - Each curve has unique lamb - # - In total 5 parameters in the fit, namely, p0, p1, p2, p3 - # - self.analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, p2, p3, p4: fit_function.exponential_decay( - x, amp=p0, lamb=p1, baseline=p4 - ), - filter_kwargs={"type": 1, "valid": True}, - model_description=r"p_0 * \exp(p_1 x) + p4", - ), - SeriesDef( - name="curve2", - fit_func=lambda x, p0, p1, p2, p3, p4: fit_function.exponential_decay( - x, amp=p0, lamb=p2, baseline=p4 - ), - filter_kwargs={"type": 2, "valid": True}, - model_description=r"p_0 * \exp(p_2 x) + p4", - ), - SeriesDef( - name="curve3", - fit_func=lambda x, p0, p1, p2, p3, p4: fit_function.exponential_decay( - x, amp=p0, lamb=p3, baseline=p4 - ), - filter_kwargs={"type": 3, "valid": True}, - model_description=r"p_0 * \exp(p_3 x) + p4", - ), - ], - ) - self.err_decimal = 3 - - def test_parsed_fit_params(self): - """Test parsed fit params.""" - self.assertSetEqual(set(self.analysis._fit_params()), {"p0", "p1", "p2", "p3", "p4"}) - - def test_parsed_init_guess(self): - """Test parsed initial guess and boundaries.""" - default_p0 = self.analysis._default_options().p0 - default_bounds = self.analysis._default_options().bounds - ref = {"p0": None, "p1": None, "p2": None, "p3": None, "p4": None} - self.assertDictEqual(default_p0, ref) - self.assertDictEqual(default_bounds, ref) - - def test_cannot_create_invalid_series_fit(self): - """Test we cannot create invalid analysis instance.""" - invalid_series = [ - SeriesDef( - name="fit1", - fit_func=lambda x, p0: fit_function.exponential_decay(x, amp=p0), - ), - SeriesDef( - name="fit2", - fit_func=lambda x, p1: fit_function.exponential_decay(x, amp=p1), - ), - ] - with self.assertRaises(AnalysisError): - create_new_analysis(series=invalid_series) # fit1 has param p0 while fit2 has p1 - - def test_cannot_create_invalid_fixed_parameter(self): - """Test we cannot create invalid analysis instance with wrong fixed value name.""" - valid_series = [ - SeriesDef( - fit_func=lambda x, p0, p1: fit_function.exponential_decay(x, amp=p0, lamb=p1), - ), - ] - with self.assertRaises(AnalysisError): - create_new_analysis( - series=valid_series, - fixed_params=["not_existing_parameter"], # this parameter is not defined - ) - - def test_data_extraction(self): - """Test data extraction method.""" - self.analysis.set_options(x_key="xval") - - # data to analyze - test_data0 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": 1.0}, - type=1, - valid=True, - ) - - # fake data - test_data1 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": 0.5}, - type=2, - valid=False, - ) - - # merge two experiment data - for datum in test_data1.data(): - test_data0.add_data(datum) - - self.analysis._extract_curves( - experiment_data=test_data0, data_processor=probability(outcome="1") - ) - - raw_data = self.analysis._data(label="raw_data") - - xdata = raw_data.x - ydata = raw_data.y - sigma = raw_data.y_err - d_index = raw_data.data_index - - # check if the module filter off data: valid=False - self.assertEqual(len(xdata), 20) - - # check x values - ref_x = np.concatenate((self.xvalues, self.xvalues)) - np.testing.assert_array_almost_equal(xdata, ref_x) - - # check y values - ref_y = np.concatenate( - ( - fit_function.exponential_decay(self.xvalues, amp=1.0), - fit_function.exponential_decay(self.xvalues, amp=0.5), - ) - ) - np.testing.assert_array_almost_equal(ydata, ref_y, decimal=self.err_decimal) - - # check series - ref_series = np.concatenate((np.zeros(10, dtype=int), -1 * np.ones(10, dtype=int))) - self.assertListEqual(list(d_index), list(ref_series)) - - # check y errors - ref_yerr = ref_y * (1 - ref_y) / 100000 - np.testing.assert_array_almost_equal(sigma, ref_yerr, decimal=self.err_decimal) - - def test_get_subset(self): - """Test that get subset data from full data array.""" - # data to analyze - fake_data = [ - {"data": 1, "metadata": {"xval": 1, "type": 1, "valid": True}}, - {"data": 2, "metadata": {"xval": 2, "type": 2, "valid": True}}, - {"data": 3, "metadata": {"xval": 3, "type": 1, "valid": True}}, - {"data": 4, "metadata": {"xval": 4, "type": 3, "valid": True}}, - {"data": 5, "metadata": {"xval": 5, "type": 3, "valid": True}}, - {"data": 6, "metadata": {"xval": 6, "type": 4, "valid": True}}, # this if fake - ] - expdata = ExperimentData(experiment=FakeExperiment()) - for datum in fake_data: - expdata.add_data(datum) - - def _processor(datum): - return datum["data"], datum["data"] * 2 - - self.analysis.set_options(x_key="xval") - self.analysis._extract_curves(expdata, data_processor=_processor) - - filt_data = self.analysis._data(series_name="curve1") - np.testing.assert_array_equal(filt_data.x, np.asarray([1, 3], dtype=float)) - np.testing.assert_array_equal(filt_data.y, np.asarray([1, 3], dtype=float)) - np.testing.assert_array_equal(filt_data.y_err, np.asarray([2, 6], dtype=float)) - - filt_data = self.analysis._data(series_name="curve2") - np.testing.assert_array_equal(filt_data.x, np.asarray([2], dtype=float)) - np.testing.assert_array_equal(filt_data.y, np.asarray([2], dtype=float)) - np.testing.assert_array_equal(filt_data.y_err, np.asarray([4], dtype=float)) - - filt_data = self.analysis._data(series_name="curve3") - np.testing.assert_array_equal(filt_data.x, np.asarray([4, 5], dtype=float)) - np.testing.assert_array_equal(filt_data.y, np.asarray([4, 5], dtype=float)) - np.testing.assert_array_equal(filt_data.y_err, np.asarray([8, 10], dtype=float)) - - -class TestCurveAnalysisIntegration(QiskitExperimentsTestCase): - """Integration test for curve fit analysis through entire analysis.run function.""" - - def setUp(self): - super().setUp() - self.xvalues = np.linspace(0.1, 1, 50) - self.err_decimal = 2 - - def test_run_single_curve_analysis(self): - """Test analysis for single curve.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, p2, p3: fit_function.exponential_decay( - x, amp=p0, lamb=p1, x0=p2, baseline=p3 - ), - model_description=r"p_0 \exp(p_1 x + p_2) + p_3", - ) - ], - ) - ref_p0 = 0.9 - ref_p1 = 2.5 - ref_p2 = 0.0 - ref_p3 = 0.1 - - test_data = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p1, "x0": ref_p2, "baseline": ref_p3}, - ) - analysis.set_options( - p0={"p0": ref_p0, "p1": ref_p1, "p2": ref_p2, "p3": ref_p3}, - result_parameters=[ParameterRepr("p1", "parameter_name", "unit")], - ) - - results, _ = analysis._run_analysis(test_data) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p2, ref_p3]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - self.assertEqual(result.extra["dof"], 46) - self.assertListEqual(result.extra["popt_keys"], ["p0", "p1", "p2", "p3"]) - self.assertDictEqual(result.extra["fit_models"], {"curve1": r"p_0 \exp(p_1 x + p_2) + p_3"}) - - # special entry formatted for database - result = results[1] - self.assertEqual(result.name, "parameter_name") - self.assertEqual(result.extra["unit"], "unit") - self.assertAlmostEqual(result.value.nominal_value, ref_p1, places=self.err_decimal) - - def test_run_single_curve_fail(self): - """Test analysis returns status when it fails.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, p2, p3: fit_function.exponential_decay( - x, amp=p0, lamb=p1, x0=p2, baseline=p3 - ), - ) - ], - ) - ref_p0 = 0.9 - ref_p1 = 2.5 - ref_p2 = 0.0 - ref_p3 = 0.1 - - test_data = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p1, "x0": ref_p2, "baseline": ref_p3}, - ) - analysis.set_options( - p0={"p0": ref_p0, "p1": ref_p1, "p2": ref_p2, "p3": ref_p3}, - bounds={"p0": [-10, 0], "p1": [-10, 0], "p2": [-10, 0], "p3": [-10, 0]}, - return_data_points=True, - ) - - # Try to fit with infeasible parameter boundary. This should fail. - results, _ = analysis._run_analysis(test_data) - - # This returns only data point entry - self.assertEqual(len(results), 1) - self.assertEqual(results[0].name, "@Data_TestAnalysis") - - def test_run_two_curves_with_same_fitfunc(self): - """Test analysis for two curves. Curves shares fit model.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, p2, p3, p4: fit_function.exponential_decay( - x, amp=p0, lamb=p1, x0=p3, baseline=p4 - ), - filter_kwargs={"exp": 0}, - ), - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, p2, p3, p4: fit_function.exponential_decay( - x, amp=p0, lamb=p2, x0=p3, baseline=p4 - ), - filter_kwargs={"exp": 1}, - ), - ], - ) - ref_p0 = 0.9 - ref_p1 = 7.0 - ref_p2 = 5.0 - ref_p3 = 0.0 - ref_p4 = 0.1 - - test_data0 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p1, "x0": ref_p3, "baseline": ref_p4}, - exp=0, - ) - - test_data1 = simulate_output_data( - func=fit_function.exponential_decay, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "lamb": ref_p2, "x0": ref_p3, "baseline": ref_p4}, - exp=1, - ) - - # merge two experiment data - for datum in test_data1.data(): - test_data0.add_data(datum) - - analysis.set_options( - p0={"p0": ref_p0, "p1": ref_p1, "p2": ref_p2, "p3": ref_p3, "p4": ref_p4} - ) - results, _ = analysis._run_analysis(test_data0) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p2, ref_p3, ref_p4]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - - def test_run_two_curves_with_two_fitfuncs(self): - """Test analysis for two curves. Curves shares fit parameters.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, p2, p3: fit_function.cos( - x, amp=p0, freq=p1, phase=p2, baseline=p3 - ), - filter_kwargs={"exp": 0}, - ), - SeriesDef( - name="curve2", - fit_func=lambda x, p0, p1, p2, p3: fit_function.sin( - x, amp=p0, freq=p1, phase=p2, baseline=p3 - ), - filter_kwargs={"exp": 1}, - ), - ], - ) - ref_p0 = 0.1 - ref_p1 = 2 - ref_p2 = -0.3 - ref_p3 = 0.5 - - test_data0 = simulate_output_data( - func=fit_function.cos, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - exp=0, - ) - - test_data1 = simulate_output_data( - func=fit_function.sin, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - exp=1, - ) - - # merge two experiment data - for datum in test_data1.data(): - test_data0.add_data(datum) - - analysis.set_options(p0={"p0": ref_p0, "p1": ref_p1, "p2": ref_p2, "p3": ref_p3}) - results, _ = analysis._run_analysis(test_data0) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p2, ref_p3]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - - def test_run_fixed_parameters(self): - """Test analysis when some of parameters are fixed.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, fixed_p2, p3: fit_function.cos( - x, amp=p0, freq=p1, phase=fixed_p2, baseline=p3 - ), - ), - ], - fixed_params=["fixed_p2"], - ) - - ref_p0 = 0.1 - ref_p1 = 2 - ref_p2 = -0.3 - ref_p3 = 0.5 - - test_data = simulate_output_data( - func=fit_function.cos, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - ) - - analysis.set_options( - p0={"p0": ref_p0, "p1": ref_p1, "p3": ref_p3}, - fixed_p2=ref_p2, - ) - - results, _ = analysis._run_analysis(test_data) - result = results[0] - - ref_popt = np.asarray([ref_p0, ref_p1, ref_p3]) - - # check result data - np.testing.assert_array_almost_equal(result.value, ref_popt, decimal=self.err_decimal) - - def test_fixed_param_is_missing(self): - """Test raising an analysis error when fixed parameter is missing.""" - analysis = create_new_analysis( - series=[ - SeriesDef( - name="curve1", - fit_func=lambda x, p0, p1, fixed_p2, p3: fit_function.cos( - x, amp=p0, freq=p1, phase=fixed_p2, baseline=p3 - ), - ), - ], - fixed_params=["fixed_p2"], - ) - - ref_p0 = 0.1 - ref_p1 = 2 - ref_p2 = -0.3 - ref_p3 = 0.5 - - test_data = simulate_output_data( - func=fit_function.cos, - xvals=self.xvalues, - param_dict={"amp": ref_p0, "freq": ref_p1, "phase": ref_p2, "baseline": ref_p3}, - ) - # do not define fixed_p2 here - analysis.set_options(p0={"p0": ref_p0, "p1": ref_p1, "p3": ref_p3}) - with self.assertRaises(AnalysisError): - analysis._run_analysis(test_data) - - -class TestFitOptions(QiskitExperimentsTestCase): - """Unittest for fit option object.""" - - def test_empty(self): - """Test if default value is automatically filled.""" - opt = FitOptions(["p0", "p1", "p2"]) - - # bounds should be default to inf tuple. otherwise crashes the scipy fitter. - ref_opts = { - "p0": {"p0": None, "p1": None, "p2": None}, - "bounds": {"p0": (-np.inf, np.inf), "p1": (-np.inf, np.inf), "p2": (-np.inf, np.inf)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_create_option_with_dict(self): - """Create option and fill with dictionary.""" - opt = FitOptions( - ["p0", "p1", "p2"], - default_p0={"p0": 0, "p1": 1, "p2": 2}, - default_bounds={"p0": (0, 1), "p1": (1, 2), "p2": (2, 3)}, - ) - - ref_opts = { - "p0": {"p0": 0.0, "p1": 1.0, "p2": 2.0}, - "bounds": {"p0": (0.0, 1.0), "p1": (1.0, 2.0), "p2": (2.0, 3.0)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_create_option_with_array(self): - """Create option and fill with array.""" - opt = FitOptions( - ["p0", "p1", "p2"], - default_p0=[0, 1, 2], - default_bounds=[(0, 1), (1, 2), (2, 3)], - ) - - ref_opts = { - "p0": {"p0": 0.0, "p1": 1.0, "p2": 2.0}, - "bounds": {"p0": (0.0, 1.0), "p1": (1.0, 2.0), "p2": (2.0, 3.0)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_override_partial_dict(self): - """Create option and override value with partial dictionary.""" - opt = FitOptions(["p0", "p1", "p2"]) - opt.p0.set_if_empty(p1=3) - - ref_opts = { - "p0": {"p0": None, "p1": 3.0, "p2": None}, - "bounds": {"p0": (-np.inf, np.inf), "p1": (-np.inf, np.inf), "p2": (-np.inf, np.inf)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_cannot_override_assigned_value(self): - """Test cannot override already assigned value.""" - opt = FitOptions(["p0", "p1", "p2"]) - opt.p0.set_if_empty(p1=3) - opt.p0.set_if_empty(p1=5) - - ref_opts = { - "p0": {"p0": None, "p1": 3.0, "p2": None}, - "bounds": {"p0": (-np.inf, np.inf), "p1": (-np.inf, np.inf), "p2": (-np.inf, np.inf)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_can_override_assigned_value_with_dict_access(self): - """Test override already assigned value with direct dict access.""" - opt = FitOptions(["p0", "p1", "p2"]) - opt.p0["p1"] = 3 - opt.p0["p1"] = 5 - - ref_opts = { - "p0": {"p0": None, "p1": 5.0, "p2": None}, - "bounds": {"p0": (-np.inf, np.inf), "p1": (-np.inf, np.inf), "p2": (-np.inf, np.inf)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_cannot_override_user_option(self): - """Test cannot override already assigned value.""" - opt = FitOptions(["p0", "p1", "p2"], default_p0={"p1": 3}) - opt.p0.set_if_empty(p1=5) - - ref_opts = { - "p0": {"p0": None, "p1": 3, "p2": None}, - "bounds": {"p0": (-np.inf, np.inf), "p1": (-np.inf, np.inf), "p2": (-np.inf, np.inf)}, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_set_operation(self): - """Test if set works and duplicated entry is removed.""" - opt1 = FitOptions(["p0", "p1"], default_p0=[0, 1]) - opt2 = FitOptions(["p0", "p1"], default_p0=[0, 1]) - opt3 = FitOptions(["p0", "p1"], default_p0=[0, 2]) - - opts = set() - opts.add(opt1) - opts.add(opt2) - opts.add(opt3) - - self.assertEqual(len(opts), 2) - - def test_detect_invalid_p0(self): - """Test if invalid p0 raises Error.""" - with self.assertRaises(AnalysisError): - # less element - FitOptions(["p0", "p1", "p2"], default_p0=[0, 1]) - - def test_detect_invalid_bounds(self): - """Test if invalid bounds raises Error.""" - with self.assertRaises(AnalysisError): - # less element - FitOptions(["p0", "p1", "p2"], default_bounds=[(0, 1), (1, 2)]) - - with self.assertRaises(AnalysisError): - # not min-max tuple - FitOptions(["p0", "p1", "p2"], default_bounds=[0, 1, 2]) - - with self.assertRaises(AnalysisError): - # max-min tuple - FitOptions(["p0", "p1", "p2"], default_bounds=[(1, 0), (2, 1), (3, 2)]) - - def test_detect_invalid_key(self): - """Test if invalid key raises Error.""" - opt = FitOptions(["p0", "p1", "p2"]) - - with self.assertRaises(AnalysisError): - opt.p0.set_if_empty(p3=3) - - def test_set_extra_options(self): - """Add extra fitter options.""" - opt = FitOptions( - ["p0", "p1", "p2"], default_p0=[0, 1, 2], default_bounds=[(0, 1), (1, 2), (2, 3)] - ) - opt.add_extra_options(ex1=0, ex2=1) - - ref_opts = { - "p0": {"p0": 0.0, "p1": 1.0, "p2": 2.0}, - "bounds": {"p0": (0.0, 1.0), "p1": (1.0, 2.0), "p2": (2.0, 3.0)}, - "ex1": 0, - "ex2": 1, - } - - self.assertDictEqual(opt.options, ref_opts) - - def test_complicated(self): - """Test for realistic operations for algorithmic guess with user options.""" - user_p0 = {"p0": 1, "p1": None} - user_bounds = {"p0": None, "p1": (-100, 100)} - - opt = FitOptions( - ["p0", "p1", "p2"], - default_p0=user_p0, - default_bounds=user_bounds, - ) - - # similar computation in algorithmic guess - - opt.p0.set_if_empty(p0=5) # this is ignored because user already provided initial guess - opt.p0.set_if_empty(p1=opt.p0["p0"] * 2 + 3) # user provided guess propagates - - opt.bounds.set_if_empty(p0=(0, 10)) # this will be set - opt.add_extra_options(fitter="algo1") - - opt1 = opt.copy() # copy options while keeping previous values - opt1.p0.set_if_empty(p2=opt1.p0["p0"] + opt1.p0["p1"]) - - opt2 = opt.copy() - opt2.p0.set_if_empty(p2=opt2.p0["p0"] * 2) # add another p2 value - - ref_opt1 = { - "p0": {"p0": 1.0, "p1": 5.0, "p2": 6.0}, - "bounds": {"p0": (0.0, 10.0), "p1": (-100.0, 100.0), "p2": (-np.inf, np.inf)}, - "fitter": "algo1", - } - - ref_opt2 = { - "p0": {"p0": 1.0, "p1": 5.0, "p2": 2.0}, - "bounds": {"p0": (0.0, 10.0), "p1": (-100.0, 100.0), "p2": (-np.inf, np.inf)}, - "fitter": "algo1", - } - - self.assertDictEqual(opt1.options, ref_opt1) - self.assertDictEqual(opt2.options, ref_opt2)