Source code for evo_gafs.core.selector

"""The :class:`GAFeatureSelector` estimator."""

from __future__ import annotations

import logging
import random
import time
from dataclasses import replace
from typing import Callable, Union

import numpy as np
import pandas as pd
from deap import base, tools
from sklearn.base import BaseEstimator
from sklearn.feature_selection import SelectorMixin
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.utils.validation import check_is_fitted, validate_data

from evo_gafs.algorithms.nsga2 import run_nsga2
from evo_gafs.algorithms.single import run_single_objective
from evo_gafs.core.config import EvolutionStats, GAConfig, SelectionResult
from evo_gafs.core.evaluator import FitnessEvaluator
from evo_gafs.operators.crossover import cx_uniform_with_repair
from evo_gafs.operators.mutation import mut_flip_with_repair
from evo_gafs.operators.repair import init_individual
from evo_gafs.utils.deap_utils import create_types
from evo_gafs.utils.validation import infer_task_type, prepare_target, resolve_scoring

logger = logging.getLogger("evo_gafs")

ArrayLike = Union[np.ndarray, pd.DataFrame]
Callback = Callable[[int, EvolutionStats, list], bool]


[docs] class GAFeatureSelector(SelectorMixin, BaseEstimator): """Genetic-algorithm wrapper feature selector, compatible with scikit-learn. The selector searches for the subset of features that maximises a cross-validated score of ``estimator`` (the wrapper criterion), optionally trading raw performance for a smaller feature set. Parameters ---------- estimator : sklearn estimator Model used to score candidate feature subsets. Must implement ``fit`` and ``predict``. Fast estimators (decision trees, linear models) keep the search affordable. It is cloned for every evaluation, never fitted in place. config : GAConfig, optional Genetic-algorithm configuration. If ``None``, defaults are used. scoring : str, optional scikit-learn scoring string (e.g. ``'accuracy'``, ``'f1_macro'``, ``'r2'``, ``'neg_mean_squared_error'``). If ``None`` it is chosen from ``task_type``. task_type : {'auto', 'classification', 'regression'}, default='auto' Problem type. ``'auto'`` infers it from ``y``. feature_names : list of str, optional Names of the input features. Inferred from a DataFrame's columns or generated as ``f0, f1, ...`` when not given. Attributes ---------- result_ : SelectionResult Full result of the run (set after :meth:`fit`). support_ : numpy.ndarray of bool Boolean mask of selected features. n_features_in_ : int Number of features seen during :meth:`fit`. feature_names_in_ : numpy.ndarray Names of features seen during :meth:`fit` (only when ``X`` is a DataFrame). Examples -------- >>> from sklearn.tree import DecisionTreeClassifier >>> from evo_gafs import GAFeatureSelector, GAConfig >>> config = GAConfig(population_size=20, n_generations=10, verbose=False) >>> selector = GAFeatureSelector( ... estimator=DecisionTreeClassifier(random_state=42), ... config=config, ... ) >>> selector.fit(X_train, y_train) # doctest: +SKIP >>> X_selected = selector.transform(X_test) # doctest: +SKIP """ def __init__( self, estimator: BaseEstimator, config: GAConfig | None = None, scoring: str | None = None, task_type: str = "auto", feature_names: list[str] | None = None, ) -> None: self.estimator = estimator self.config = config self.scoring = scoring self.task_type = task_type self.feature_names = feature_names # ── scikit-learn API ──────────────────────────────────────────────────────
[docs] def fit( self, X: ArrayLike, y: ArrayLike, callbacks: list[Callback] | None = None ) -> GAFeatureSelector: """Run the genetic algorithm to find the best feature subset. Parameters ---------- X : array-like or pandas.DataFrame of shape (n_samples, n_features) Training data. Sparse matrices are not supported. y : array-like of shape (n_samples,) Target values. Integers/strings for classification, floats for regression. callbacks : list of callable, optional Functions ``f(gen, stats, population) -> bool``. Returning ``True`` stops evolution early. Returns ------- self : GAFeatureSelector The fitted selector. """ config = self.config if self.config is not None else GAConfig() config.validate() # validate_data rejects complex/NaN input and records n_features_in_ # and (for DataFrames) feature_names_in_. X_array = validate_data(self, X, reset=True, dtype="numeric") y_array = prepare_target(y) if y_array.shape[0] != X_array.shape[0]: raise ValueError(f"X has {X_array.shape[0]} samples but y has {y_array.shape[0]}.") n_features = X_array.shape[1] feature_names = self._resolve_names(n_features) task_type = infer_task_type(y_array, self.task_type) scoring = resolve_scoring(self.scoring, task_type) cv = self._build_cv(y_array, task_type, config) # Resolve mutation_indpb without mutating the user's config object. effective_config = replace( config, mutation_indpb=( config.mutation_indpb if config.mutation_indpb is not None else 1.0 / n_features ), ) logger.info( "GA feature selection | n_features=%d | mode=%s | scoring=%s | task=%s", n_features, effective_config.mode, scoring, task_type, ) if effective_config.random_seed is not None: random.seed(effective_config.random_seed) np.random.seed(effective_config.random_seed) evaluator = FitnessEvaluator( estimator=self.estimator, X=X_array, y=y_array, scoring=scoring, cv=cv, config=effective_config, ) deap_types = create_types(effective_config.mode) try: toolbox = self._build_toolbox( n_features, effective_config, deap_types.individual_cls, evaluator ) start = time.time() if effective_config.mode == "multiobjective": population, _, history = run_nsga2(toolbox, effective_config, callbacks) else: population, _, history = run_single_objective(toolbox, effective_config, callbacks) total_time = time.time() - start result = self._extract_result( population=population, history=history, feature_names=feature_names, n_features=n_features, evaluator=evaluator, config=effective_config, total_time=total_time, ) finally: deap_types.cleanup() self.result_ = result self.support_ = result.selected_mask if effective_config.verbose: print(result.summary()) return self
def _get_support_mask(self) -> np.ndarray: """Return the boolean mask of selected features (sklearn contract).""" check_is_fitted(self, "support_") return self.support_ # ── Convenience ─────────────────────────────────────────────────────────--
[docs] def summary(self) -> str: """Return a human-readable summary of the fitted result.""" check_is_fitted(self, "result_") return self.result_.summary()
# ── Internals ───────────────────────────────────────────────────────────-- def _resolve_names(self, n_features: int) -> list[str]: """Resolve feature names for reporting (DataFrame > explicit > generated).""" if getattr(self, "feature_names_in_", None) is not None: return [str(name) for name in self.feature_names_in_] if self.feature_names is not None: if len(self.feature_names) != n_features: raise ValueError( f"feature_names has length {len(self.feature_names)} but X has " f"{n_features} columns." ) return list(self.feature_names) return [f"f{i}" for i in range(n_features)] def _build_cv(self, y: np.ndarray, task_type: str, config: GAConfig) -> object: """Build a CV splitter, clamping folds to the smallest class if needed.""" if task_type == "classification": _, counts = np.unique(y, return_counts=True) n_splits = max(2, min(config.cv_folds, int(counts.min()))) return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=config.random_seed) n_splits = max(2, min(config.cv_folds, y.shape[0])) return KFold(n_splits=n_splits, shuffle=True, random_state=config.random_seed) def _build_toolbox( self, n_features: int, config: GAConfig, individual_cls: type, evaluator: FitnessEvaluator, ) -> base.Toolbox: """Configure the DEAP toolbox with all genetic operators.""" toolbox = base.Toolbox() toolbox.register( "individual", init_individual, ind_class=individual_cls, n_features=n_features, min_features=config.min_features, ) toolbox.register("population", tools.initRepeat, list, toolbox.individual) toolbox.register("evaluate", evaluator) toolbox.register( "mate", cx_uniform_with_repair, indpb=0.5, min_features=config.min_features ) toolbox.register( "mutate", mut_flip_with_repair, indpb=config.mutation_indpb, min_features=config.min_features, ) if config.mode == "multiobjective": toolbox.register("select", tools.selNSGA2) else: toolbox.register("select", tools.selTournament, tournsize=config.tournament_size) return toolbox def _extract_result( self, population: list, history: list[EvolutionStats], feature_names: list[str], n_features: int, evaluator: FitnessEvaluator, config: GAConfig, total_time: float, ) -> SelectionResult: """Extract the best individual and assemble a :class:`SelectionResult`.""" pareto_data: list[dict] | None = None if config.mode == "multiobjective": front = tools.sortNondominated(population, len(population), first_front_only=True)[0] best_ind = max(front, key=lambda ind: ind.fitness.values[0]) best_cv_score = float(best_ind.fitness.values[0]) best_fitness = best_cv_score pareto_data = [ { "mask": list(ind), "cv_score": float(ind.fitness.values[0]), "compression": float(ind.fitness.values[1]), "n_features": int(sum(ind)), } for ind in front ] else: best_ind = tools.selBest(population, 1)[0] best_fitness = float(best_ind.fitness.values[0]) selected = [i for i, bit in enumerate(best_ind) if bit == 1] best_cv_score = evaluator.cv_score(selected) mask = np.array([bool(bit) for bit in best_ind]) indices = np.where(mask)[0] n_selected = int(mask.sum()) return SelectionResult( selected_mask=mask, selected_indices=indices, selected_feature_names=[feature_names[i] for i in indices], best_fitness=best_fitness, best_cv_score=best_cv_score, n_selected=n_selected, compression_ratio=1.0 - (n_selected / n_features), history=history, pareto_front=pareto_data, config=config, total_time=total_time, n_evaluations=evaluator.eval_count, ) # ── Estimator tags (scikit-learn >= 1.6) ────────────────────────────────── def __sklearn_tags__(self): tags = super().__sklearn_tags__() tags.target_tags.required = True tags.input_tags.allow_nan = False return tags