API Reference

CellViability: A package for analyzing cell viability data from HTS assays.

CellViabilityProtocol

A class to represent a cell viability analysis protocol.

Attributes

config : dict A dictionary containing configuration for the protocol. name : str The name of the protocol. verbose : bool A flag to indicate whether to run in verbose mode.

Methods

_load_screening() -> Screening Loads and returns a Screening instance. execute() Executes the cell viability analysis protocol.

Source code in CellViability/protocols/cv.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
class CellViabilityProtocol:
    """
    A class to represent a cell viability analysis protocol.

    Attributes
    ----------
    config : dict
        A dictionary containing configuration for the protocol.
    name : str
        The name of the protocol.
    verbose : bool
        A flag to indicate whether to run in verbose mode.

    Methods
    -------
    _load_screening() -> Screening
        Loads and returns a Screening instance.
    execute()
        Executes the cell viability analysis protocol.
    """

    def __init__(self, config: dict, name: str, basedir: str = "results", verbose: bool = False) -> None:
        self.config: dict = config
        self.screen: Screen = self._load_screen(config, name)
        self.model: stardist.models.model2d.StarDist2D | None = None
        self.basedir: str = basedir
        self.verbose: bool = verbose

    # -------------------------------------------------------------------------
    # Loading and setup
    # -------------------------------------------------------------------------
    def _create_directories(self, plate: Plate, npy: bool = False, instances: bool = False) -> None:
        """
        Creates necessary directories for storing results.

        Parameters
        ----------
        plate : str
            The name of the plate.
        npy : bool, optional
            If True, creates a directory for .npy files, by default False.
        instances : bool, optional
            If True, creates a directory for instance segmentation masks, by default False.
        """
        platedir = os.path.join(self.basedir, self.screen.name, plate.name)
        os.makedirs(platedir, exist_ok=True)

        if instances:
            os.makedirs(os.path.join(platedir, "instances"), exist_ok=True)
        if npy:
            os.makedirs(os.path.join(platedir, "npy"), exist_ok=True)

    def _load_model(self, warmup: bool = True) -> stardist.models.model2d.StarDist2D:
        """
        Loads the pre-trained StarDist2D model for cell segmentation.

        Parameters
        ----------
        use_gpu : bool, optional
            Whether to use GPU for model inference, by default True.

        Returns
        -------
        StarDist2D
            The loaded StarDist2D model.
        """
        model: stardist.models.model2d.StarDist2D = stardist.models.StarDist2D.from_pretrained("2D_versatile_fluo")

        # Warm-up the model (optional, but can speed up first prediction)
        if warmup:
            for _ in range(10):
                _ = model.predict_instances(numpy.zeros((256, 256), dtype=numpy.float32))

        return model

    def _load_screen(self, config: dict, name: str) -> Screen:
        """
        Loads and returns a Screen instance.

        Parameters
        ----------
        config : dict
            A dictionary containing configuration for the screen.
        name : str
            The name of the screen.

        Returns
        -------
        Screen
            An instance of the Screen class.
        """
        return Screen(config, name)

    # -------------------------------------------------------------------------
    # Image processing and characterization
    # -------------------------------------------------------------------------
    def _segment(
        self,
        image: Image,
        channel: int = 0,
        sigma: float = 1.0,
        min_size: int = 0,
        max_size: int = 1000000,
    ) -> Image:
        """
        Segments the images to identify cells.

        Parameters
        ----------
        image : Image
            The image to segment.
        channel : int, optional
            The channel to process, by default 0.
        sigma : float, optional
            The standard deviation for Gaussian filtering, by default 1.0.
        min_size : int, optional
            The minimum object size (in pixels) to keep, by default 0.
        max_size : int, optional
            The maximum object size (in pixels) to keep, by default 1e6.

        Returns
        -------
        Image
            The segmented image.
        """
        img: numpy.ndarray = image.data[0, 0, channel, :, :]

        # Gaussian smoothing and normalization
        filtered: numpy.ndarray = skimage.filters.gaussian(img, sigma=sigma)

        # Instance segmentation using StarDist
        if self.model is not None:
            labels: numpy.ndarray
            labels, _ = self.model.predict_instances(normalize(filtered))

        if min_size > 0:
            labels = skimage.morphology.remove_small_objects(labels, min_size=int(min_size))

        if max_size < 1e6:
            labels = labels ^ skimage.morphology.remove_small_objects(labels, min_size=int(max_size))

        # Return the segmented image
        segmented = Image()
        segmented.upload(BioImage(labels))

        return segmented

    def _characterize(self, segmented: Image) -> pandas.DataFrame:
        """
        Counts and characterizes cells in the given image.

        Parameters
        ----------
        segmented : Image
            The segmented image to analyze.

        Returns
        -------
        pandas.DataFrame
            A DataFrame containing properties of the counted cells.
        """
        props = skimage.measure.regionprops_table(
            segmented.data[0, 0, 0, :, :],
            properties=[
                "label",
                "area",
                "perimeter",
                "eccentricity",
            ],
        )

        return pandas.DataFrame(props)

    def _analyze_well(self, plate: Plate, well: Well, parameters: dict, npy: bool, instances: bool) -> tuple:
        """
        Analyzes a single well in a plate.

        Parameters
        ----------
        plate : Plate
            The plate containing the well.
        well : Well
            The well to analyze.
        parameters : dict
            A dictionary containing analysis parameters.
        npy : bool
            If True, saves instance segmentation masks as .npy files.
        instances : bool
            If True, saves instance segmentation masks as .png files.

        Returns
        -------
        tuple
            A tuple containing the number of cells and their properties.
        """
        # Characterizations to be calculated
        ncells: int = 0
        properties: list[pandas.DataFrame] = []

        n = [] * len(well.images)
        for image in well.images:
            if self.verbose:
                print(f"{image.filename}", end=" ")

            # Pre-processing and Segmentation
            segmented: Image = self._segment(
                image,
                channel=parameters.get("channel", 0),
                sigma=parameters.get("sigma", 1.0),
                min_size=parameters.get("min_size", 0.0),
                max_size=parameters.get("max_size", 1e6),
            )

            # Basename for saving files
            if instances or npy:
                basename: str = os.path.splitext(os.path.basename(image.filename))[0]
            platedir = os.path.join(self.basedir, self.screen.name, plate.name)

            # Save .png file
            if instances:
                filename: str = os.path.join(platedir, "instances", f"{basename}.png")
                self._save_instances(image, segmented, filename)

            # Save .npy file
            if npy:
                filename = os.path.join(platedir, "npy", f"{basename}.npy")
                self._save_npy(segmented, filename)

            # Cell properties characterization
            props: pandas.DataFrame = self._characterize(segmented)
            props["screen"] = self.screen.name
            props["plate"] = plate.name
            props["well"] = well.name
            props["field"] = image.field
            properties.append(props)

            # Cell counting
            n.append(int(props["label"].count()))

        # Merge fields
        if parameters.get("merge", "sum") == "sum":
            ncells = sum(n)

        return ncells, pandas.concat(properties, ignore_index=True)

    def _analyze_plate(
        self, plate: Plate, parameters: dict, npy: bool, instances: bool
    ) -> tuple[pandas.DataFrame, pandas.DataFrame]:
        """
        Analyzes a single plate in the screening.

        Parameters
        ----------
        plate : Plate
            The plate to analyze.
        parameters : dict
            A dictionary containing analysis parameters.
        npy : bool
            If True, saves instance segmentation masks as .npy files.
        instances : bool
            If True, saves instance segmentation masks as .png files.

        Returns
        -------
        tuple[pandas.DataFrame, pandas.DataFrame]
            A tuple containing the number of cells and their properties.
        """
        # Characterizations to be calculated
        ncells: dict[str, int] = {}
        properties: list[pandas.DataFrame] = []

        for well in plate.wells:
            if self.verbose:
                print(f"> Analyzing well {well.name} ...")

            # Analyze the well
            ncells[well.name], props = self._analyze_well(plate, well, parameters, npy, instances)
            properties.append(props)

            if self.verbose:
                print("\n", end="", flush=True)

        # Convert cell counting to DataFrame
        df_ncells: pandas.DataFrame = (
            pandas.DataFrame.from_dict(ncells, orient="index", columns=["ncells"])
            .reset_index()
            .rename(columns={"index": "well"})
        )
        df_ncells.insert(0, "plate", plate.name)

        # Convert properties to DataFrame
        df_properties: pandas.DataFrame = pandas.concat(properties, ignore_index=True)

        return df_ncells, df_properties

    # -------------------------------------------------------------------------
    # Saving
    # -------------------------------------------------------------------------
    def _save_npy(self, image: Image, filename: str) -> None:
        """
        Saves the given image as a .npy file.

        Parameters
        ----------
        image : Image
            The image to save.
        filename : str
            The filename to save the .npy file.
        """
        numpy.save(filename, image.data)

    def _save_instances(self, image: Image, segmented: Image, filename: str) -> None:
        """
        Saves the instance segmentation mask as a .png file.

        Parameters
        ----------
        image : Image
            The original image.
        segmented : Image
            The segmented image to save.
        filename : str
            The filename to save the .png file.
        """
        # Get segmented channel
        channel = self.config["parameters"].get("channel", 0)

        plt.figure()
        plt.imshow(render_label(segmented.data[0, 0, 0, :, :], img=image.data[0, 0, channel, :, :]))
        plt.axis("off")
        plt.savefig(filename, bbox_inches="tight", pad_inches=0)
        plt.close()

    # -------------------------------------------------------------------------
    # Post-processing: Normalization (inCPE), Z-score and hit selecion
    # -------------------------------------------------------------------------
    def _zscore(self, ncells: pandas.DataFrame) -> float:
        """
        Applies Z-score normalization to the cell counting.

        Parameters
        ----------
        ncells : pandas.DataFrame
            The DataFrame containing cell counting.

        Returns
        -------
        float
            The Z-score value.

        Note
        ----
        Z-score formula: Z' = 1 - (3 * (sp + sn)) / |mp - mn|
        where sp and sn are the standard deviations of the positive and
        negative controls, and mp and mn are their means.
        """
        # Select positive and negative controls
        pos: pandas.Series = ncells.loc[ncells["well"].isin(self.config["controls"]["positive"]), "ncells"]
        neg: pandas.Series = ncells.loc[ncells["well"].isin(self.config["controls"]["negative"]), "ncells"]

        # Calculate Z-score for plate
        zscore: float = 1 - (3 * (pos.std() + neg.std())) / abs(pos.mean() - neg.mean())

        return zscore

    def _normalization(self, ncells: pandas.DataFrame) -> pandas.DataFrame:
        """
        Applies inCPE normalization to the cell counting.

        The inCPE (inhibition of cytopathic effect) is calculated as:

            inCPE = (Ncells - mean(Zpos)) / (mean(Zneg) - mean(Zpos))

        where:
            - Zpos: positive control wells (infected, untreated)
            - Zneg: negative control wells (non-infected, untreated)

        Parameters
        ----------
        ncells : pandas.DataFrame
            The DataFrame containing cell counts. Must include columns:
            'well' and 'ncells'.

        Returns
        -------
        pandas.DataFrame
            The DataFrame with an additional column 'inCPE'.
        """
        # Select positive and negative controls
        pos: pandas.Series = ncells.loc[ncells["well"].isin(self.config["controls"]["positive"]), "ncells"]
        neg: pandas.Series = ncells.loc[ncells["well"].isin(self.config["controls"]["negative"]), "ncells"]

        # Compute inCPE normalization
        ncells["inCPE"] = (ncells["ncells"] - pos.mean()) / (neg.mean() - pos.mean())

        return ncells

    def _filter_candidates(
        self,
        data: dict[str, pandas.DataFrame],
        zscore_per_plate: pandas.DataFrame,
        zscore: float = 0.5,
        incpe: float = 0.3,
    ) -> pandas.DataFrame:
        """
        Filters candidate hits based on Z-score and inCPE thresholds.

        Parameters
        ----------
        data : dict[str,pandas.DataFrame]
            The dictionary containing cell counting and inCPE values for each plate.
        zscore_per_plate : pandas.DataFrame
            The DataFrame containing Z-scores per plate.
        zscore : float, optional
            The Z-score threshold for hit selection, by default 0.5.
        incpe : float, optional
            The inCPE threshold for hit selection, by default 0.3.

        Returns
        -------
        pandas.DataFrame
            A DataFrame containing the filtered candidate hits.
        """
        # Get approved plates based on Z-score
        approved: list[str] = zscore_per_plate.loc[zscore_per_plate["zscore"] >= zscore, "plate"].tolist()

        # Filter hits based on inCPE and approved plates
        hits: pandas.DataFrame = pandas.DataFrame()
        for plate in approved:
            # Get all data from approved plate
            plate_data: pandas.DataFrame = data[plate]

            # Remove controls from data
            plate_data = plate_data.loc[
                ~plate_data["well"].isin(self.config["controls"]["negative"])
                & ~plate_data["well"].isin(self.config["controls"]["positive"])
            ]

            # Select hits based on incpe
            plate_hits = plate_data[plate_data["inCPE"] >= incpe]
            hits = pandas.concat([hits, plate_hits], ignore_index=True)

        # Print approval rate
        print(f"> Approved plates: {len(approved)} ({(len(approved) / len(data)) * 100:.2f}%)")

        # Print number of hits
        print(f"> Number of hits: {len(hits)}")

        return hits

    # -------------------------------------------------------------------------
    # Execution
    # -------------------------------------------------------------------------
    def execute(
        self, npy: bool = False, instances: bool = False
    ) -> tuple[pandas.DataFrame, pandas.DataFrame, dict[str, pandas.DataFrame], dict[str, pandas.DataFrame]]:
        """
        Executes the cell viability analysis protocol.

        Parameters
        ----------
        npy : bool, optional
            If True, saves instance segmentation masks as .npy files, by default False.
        instances : bool, optional
            If True, saves instance segmentation masks as .png files, by default False.

        Returns
        -------
        pandas.DataFrame
            A DataFrame containing the filtered candidate hits.
        pandas.DataFrame
            A DataFrame containing Z-scores for all plates.
        dict[str, pandas.DataFrame]
            A dictionary with plate names as keys and their corresponding
            analysis results as pandas DataFrames.
        dict[str, pandas.DataFrame]
            A dictionary with plate names as keys and their corresponding
            morphology properties as pandas DataFrames.

        Notes
        -----
        The method processes each plate in the screening, analyzes each well,
        and applies the specified cell viability analysis protocol.
        """
        # Load the model if not already loaded
        if self.model is None:
            print("> Loading StarDist model ...")
            self.model = self._load_model(warmup=True)

        # Initialize ncells and morphology dictionary
        ncells: dict[str, pandas.DataFrame] = {}
        morphology: dict[str, pandas.DataFrame] = {}
        zscore: dict[str, float] = {}

        # Iterate over plates, wells, and images
        for plate in self.screen.plates:
            if self.verbose:
                print(f"> Analyzing plate {plate.name} ...")

            # Create directories for the plate
            self._create_directories(plate=plate, instances=instances, npy=npy)

            # Analyze the plate
            ncells[plate.name], properties = self._analyze_plate(
                plate=plate, parameters=self.config["parameters"], npy=npy, instances=instances
            )
            morphology[plate.name] = properties

            # Analyze zcore for the plate
            zscore[plate.name] = self._zscore(ncells[plate.name])

            # Save status file
            with open(os.path.join(self.basedir, self.screen.name, plate.name, "status"), "w") as f:
                if zscore[plate.name] >= 0.5:
                    f.write("SUCCESS")
                else:
                    print(f"Warning: Z-score {zscore[plate.name]:.2f} is below the cutoff of 0.5.")
                    f.write("FAILED")

            # Normalization (inCPE)
            ncells[plate.name] = self._normalization(ncells[plate.name])

            # Plate map visualization
            plate_map(
                filename=os.path.join(self.basedir, self.screen.name, plate.name, "ncells.html"),
                data=ncells[plate.name],
                colname="ncells",
                controls=self.config["controls"],
            )
            plate_map(
                filename=os.path.join(self.basedir, self.screen.name, plate.name, "inCPE.html"),
                data=ncells[plate.name],
                colname="inCPE",
                controls=self.config["controls"],
            )

            # Save morphology as Excel file
            morphology[plate.name].to_csv(
                f"{self.basedir}/{self.screen.name}/{plate.name}/morphology.csv.gz", index=False, compression="gzip"
            )

        # Combine all z-scores into a DataFrame
        zscore_per_plate = pandas.DataFrame(list(zscore.items()), columns=["plate", "zscore"])

        # Filter hits
        hits: pandas.DataFrame = self._filter_candidates(
            ncells,
            zscore_per_plate,
            zscore=self.config["filter"].get("zscore", 0.5),
            incpe=self.config["filter"].get("zscore", 0.3),
        )

        # Save ncells as multi-sheet Excel file
        with pandas.ExcelWriter(f"{self.basedir}/{self.screen.name}/summary.xlsx", engine="openpyxl") as writer:
            zscore_per_plate.to_excel(writer, sheet_name="Z-score", index=False)
            for plate in self.screen.plates:
                ncells[plate.name].to_excel(writer, sheet_name=plate.name, index=False)

        # Save hits to Excel file
        with pandas.ExcelWriter(f"{self.basedir}/{self.screen.name}/hits.xlsx", engine="openpyxl") as writer:
            hits.to_excel(writer, sheet_name="Hits", index=False)

        # Unload model from memory
        self.model = None

        return hits, zscore_per_plate, ncells, morphology

execute(npy=False, instances=False)

Executes the cell viability analysis protocol.

Parameters

npy : bool, optional If True, saves instance segmentation masks as .npy files, by default False. instances : bool, optional If True, saves instance segmentation masks as .png files, by default False.

Returns

pandas.DataFrame A DataFrame containing the filtered candidate hits. pandas.DataFrame A DataFrame containing Z-scores for all plates. dict[str, pandas.DataFrame] A dictionary with plate names as keys and their corresponding analysis results as pandas DataFrames. dict[str, pandas.DataFrame] A dictionary with plate names as keys and their corresponding morphology properties as pandas DataFrames.

Notes

The method processes each plate in the screening, analyzes each well, and applies the specified cell viability analysis protocol.

Source code in CellViability/protocols/cv.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def execute(
    self, npy: bool = False, instances: bool = False
) -> tuple[pandas.DataFrame, pandas.DataFrame, dict[str, pandas.DataFrame], dict[str, pandas.DataFrame]]:
    """
    Executes the cell viability analysis protocol.

    Parameters
    ----------
    npy : bool, optional
        If True, saves instance segmentation masks as .npy files, by default False.
    instances : bool, optional
        If True, saves instance segmentation masks as .png files, by default False.

    Returns
    -------
    pandas.DataFrame
        A DataFrame containing the filtered candidate hits.
    pandas.DataFrame
        A DataFrame containing Z-scores for all plates.
    dict[str, pandas.DataFrame]
        A dictionary with plate names as keys and their corresponding
        analysis results as pandas DataFrames.
    dict[str, pandas.DataFrame]
        A dictionary with plate names as keys and their corresponding
        morphology properties as pandas DataFrames.

    Notes
    -----
    The method processes each plate in the screening, analyzes each well,
    and applies the specified cell viability analysis protocol.
    """
    # Load the model if not already loaded
    if self.model is None:
        print("> Loading StarDist model ...")
        self.model = self._load_model(warmup=True)

    # Initialize ncells and morphology dictionary
    ncells: dict[str, pandas.DataFrame] = {}
    morphology: dict[str, pandas.DataFrame] = {}
    zscore: dict[str, float] = {}

    # Iterate over plates, wells, and images
    for plate in self.screen.plates:
        if self.verbose:
            print(f"> Analyzing plate {plate.name} ...")

        # Create directories for the plate
        self._create_directories(plate=plate, instances=instances, npy=npy)

        # Analyze the plate
        ncells[plate.name], properties = self._analyze_plate(
            plate=plate, parameters=self.config["parameters"], npy=npy, instances=instances
        )
        morphology[plate.name] = properties

        # Analyze zcore for the plate
        zscore[plate.name] = self._zscore(ncells[plate.name])

        # Save status file
        with open(os.path.join(self.basedir, self.screen.name, plate.name, "status"), "w") as f:
            if zscore[plate.name] >= 0.5:
                f.write("SUCCESS")
            else:
                print(f"Warning: Z-score {zscore[plate.name]:.2f} is below the cutoff of 0.5.")
                f.write("FAILED")

        # Normalization (inCPE)
        ncells[plate.name] = self._normalization(ncells[plate.name])

        # Plate map visualization
        plate_map(
            filename=os.path.join(self.basedir, self.screen.name, plate.name, "ncells.html"),
            data=ncells[plate.name],
            colname="ncells",
            controls=self.config["controls"],
        )
        plate_map(
            filename=os.path.join(self.basedir, self.screen.name, plate.name, "inCPE.html"),
            data=ncells[plate.name],
            colname="inCPE",
            controls=self.config["controls"],
        )

        # Save morphology as Excel file
        morphology[plate.name].to_csv(
            f"{self.basedir}/{self.screen.name}/{plate.name}/morphology.csv.gz", index=False, compression="gzip"
        )

    # Combine all z-scores into a DataFrame
    zscore_per_plate = pandas.DataFrame(list(zscore.items()), columns=["plate", "zscore"])

    # Filter hits
    hits: pandas.DataFrame = self._filter_candidates(
        ncells,
        zscore_per_plate,
        zscore=self.config["filter"].get("zscore", 0.5),
        incpe=self.config["filter"].get("zscore", 0.3),
    )

    # Save ncells as multi-sheet Excel file
    with pandas.ExcelWriter(f"{self.basedir}/{self.screen.name}/summary.xlsx", engine="openpyxl") as writer:
        zscore_per_plate.to_excel(writer, sheet_name="Z-score", index=False)
        for plate in self.screen.plates:
            ncells[plate.name].to_excel(writer, sheet_name=plate.name, index=False)

    # Save hits to Excel file
    with pandas.ExcelWriter(f"{self.basedir}/{self.screen.name}/hits.xlsx", engine="openpyxl") as writer:
        hits.to_excel(writer, sheet_name="Hits", index=False)

    # Unload model from memory
    self.model = None

    return hits, zscore_per_plate, ncells, morphology

Image

This class is used for an image of a well.

Attributes

_image : BioImage | None The BioImage object representing the image. filename : str | None The path to the image file. image : BioImage The BioImage object representing the image.

Source code in CellViability/screening/image.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class Image:
    """This class is used for an image of a well.

    Attributes
    ----------
    _image : BioImage | None
        The BioImage object representing the image.
    filename : str | None
        The path to the image file.
    image : BioImage
        The BioImage object representing the image.
    """

    def __init__(self) -> None:
        """
        Initialize the Image object.
        """
        self.field: int | None = None
        self.filename: str | None = None
        self._image: BioImage | None = None

    def __str__(self) -> str:
        return f"<CellViability.screening.image.Image `{self.filename}` object at {hex(id(self))}>"

    def __repr__(self) -> str:
        return f"<CellViability.screening.image.Image `{self.filename}` object at {hex(id(self))}>"

    def lazyload(self, field: int, filename: str) -> "Image":
        """
        Lazily load image metadata (field and filename) without loading the image data.

        Parameters
        ----------
        field : int
            The field number (must be positive).
        filename : str
            Path to the image file.

        Raises
        ------
        FileNotFoundError
            If the image file is not found.
        ValueError
            If the field number is not positive.
        ValueError
            If the filename is not a valid string.
        """
        if not isinstance(field, int) or field < 1:
            raise ValueError("Field number must be a positive integer.")
        if not isinstance(filename, str) or not filename:
            raise ValueError("Filename must be a non-empty string.")
        if not os.path.exists(filename):
            raise FileNotFoundError(f"Image '{filename}' not found.")

        self.field = field
        self.filename = filename
        self._image = None

        return self

    def upload(self, image: BioImage) -> None:
        """
        Upload an image to this object.

        Parameters
        ----------
        image : BioImage
            The image to be saved.
        """
        self._image = image
        self.field = None
        self.filename = None

    @property
    def data(self) -> numpy.ndarray:
        """Return the image data as a numpy array.

        Returns
        -------
        numpy.ndarray
            The image data as a numpy array.
        """
        if self._image is None:
            if self.filename is None:
                raise ValueError("No image loaded or filename provided.")
            self._image = BioImage(self.filename)
        return self._image.data

    @property
    def image(self) -> BioImage:
        """Return the BioImage object.

        Returns
        -------
        BioImage
            The BioImage object representing the image.
        """
        if self._image is None:
            if self.filename is None:
                raise ValueError("No image loaded or filename provided.")
            self._image = BioImage(self.filename)
        return self._image

data property

Return the image data as a numpy array.

Returns

numpy.ndarray The image data as a numpy array.

image property

Return the BioImage object.

Returns

BioImage The BioImage object representing the image.

__init__()

Initialize the Image object.

Source code in CellViability/screening/image.py
30
31
32
33
34
35
36
def __init__(self) -> None:
    """
    Initialize the Image object.
    """
    self.field: int | None = None
    self.filename: str | None = None
    self._image: BioImage | None = None

lazyload(field, filename)

Lazily load image metadata (field and filename) without loading the image data.

Parameters

field : int The field number (must be positive). filename : str Path to the image file.

Raises

FileNotFoundError If the image file is not found. ValueError If the field number is not positive. ValueError If the filename is not a valid string.

Source code in CellViability/screening/image.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def lazyload(self, field: int, filename: str) -> "Image":
    """
    Lazily load image metadata (field and filename) without loading the image data.

    Parameters
    ----------
    field : int
        The field number (must be positive).
    filename : str
        Path to the image file.

    Raises
    ------
    FileNotFoundError
        If the image file is not found.
    ValueError
        If the field number is not positive.
    ValueError
        If the filename is not a valid string.
    """
    if not isinstance(field, int) or field < 1:
        raise ValueError("Field number must be a positive integer.")
    if not isinstance(filename, str) or not filename:
        raise ValueError("Filename must be a non-empty string.")
    if not os.path.exists(filename):
        raise FileNotFoundError(f"Image '{filename}' not found.")

    self.field = field
    self.filename = filename
    self._image = None

    return self

upload(image)

Upload an image to this object.

Parameters

image : BioImage The image to be saved.

Source code in CellViability/screening/image.py
77
78
79
80
81
82
83
84
85
86
87
88
def upload(self, image: BioImage) -> None:
    """
    Upload an image to this object.

    Parameters
    ----------
    image : BioImage
        The image to be saved.
    """
    self._image = image
    self.field = None
    self.filename = None

Plate

This class is used for a plate in a screening experiment.

Attributes

datadir : str The directory where the plate data is stored. name : str The name of the plate. wells : list of Well The list of wells in the plate.

Source code in CellViability/screening/plate.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Plate:
    """This class is used for a plate in a screening experiment.

    Attributes
    ----------
    datadir : str
        The directory where the plate data is stored.
    name : str
        The name of the plate.
    wells : list of Well
        The list of wells in the plate.
    """

    def __init__(self, config: dict, name: str, datadir: str):
        """
        Initialize the Plate object.

        Parameters
        ----------
        config : dict
            The configuration dictionary.
        name : str
            The name of the plate.
        datadir : str
            Path to the directory containing plate images.

        Raises
        ------
        ValueError
            If the data directory is not found.
        ValueError
            If the number of wells does not match the expected number from config.
        """
        # Process plate name
        self.name: str = name

        # Get data directory from the configuration
        if not os.path.exists(datadir):
            raise ValueError(f"Data directory '{datadir}' not found in the configuration.")
        self.datadir: str = datadir

        # Load wells
        self.wells: list[Well] = self._load_wells(config)

        # Check if number of wells matches expected number from config
        if len(self.wells) != config.get("wells"):
            raise ValueError(
                f"Number of wells in plate '{self.name}' ({len(self.wells)}) does not match expected number ({config.get('wells')})."
            )

    def __str__(self) -> str:
        return f"<CellViability.screening.plate.Plate `{self.name}` object at {hex(id(self))}>"

    def __repr__(self) -> str:
        return f"<CellViability.screening.plate.Plate `{self.name}` object at {hex(id(self))}>"

    def _load_wells(self, config: dict) -> list[Well]:
        """
        Load wells from the plate configuration.

        Parameters
        ----------
        config : dict
            The configuration dictionary.

        Returns
        -------
        list[Well]
            The list of wells in the plate.
        """
        raw: dict[str, list[str]] = {}

        # Get files in datadir
        for filename in sorted(os.listdir(self.datadir)):
            if filename.endswith(".tif"):
                # Get metadata from filename
                # Columbus 2.4.0.104236 build format
                # Example: '001001-1-001001001.tif
                row, column, _, _ = _get_metadata_from_filename(filename)
                well: str = f"{row}{column:02d}"

                # Group images by well
                raw.setdefault(well, []).append(os.path.join(self.datadir, filename))

        # Sort wells by row and column
        sorted_wells: list[str] = sorted(raw.keys(), key=well_sort)

        # Load wells in sorted order
        wells: list[Well] = [Well(config, wellname, raw[wellname]) for wellname in sorted_wells]

        return wells

    def well(self, name: str) -> Well:
        """Return a well object.

        Parameters
        ----------
        name : str
            The name of the well. Examples: 'A01', 'B12', etc.

        Returns
        -------
        Well
            The well object.
        """
        # Check if well is loaded
        if len(self.wells) == 0:
            raise ValueError("Wells not loaded. Please load wells first.")

        # Check if well index is out of bounds
        if name not in [well.name for well in self.wells]:
            raise ValueError(f"Well {name} does not exist in plate.")

        # Get index of well in wells
        idx: int = [well.name for well in self.wells].index(name)

        return self.wells[idx]

__init__(config, name, datadir)

Initialize the Plate object.

Parameters

config : dict The configuration dictionary. name : str The name of the plate. datadir : str Path to the directory containing plate images.

Raises

ValueError If the data directory is not found. ValueError If the number of wells does not match the expected number from config.

Source code in CellViability/screening/plate.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(self, config: dict, name: str, datadir: str):
    """
    Initialize the Plate object.

    Parameters
    ----------
    config : dict
        The configuration dictionary.
    name : str
        The name of the plate.
    datadir : str
        Path to the directory containing plate images.

    Raises
    ------
    ValueError
        If the data directory is not found.
    ValueError
        If the number of wells does not match the expected number from config.
    """
    # Process plate name
    self.name: str = name

    # Get data directory from the configuration
    if not os.path.exists(datadir):
        raise ValueError(f"Data directory '{datadir}' not found in the configuration.")
    self.datadir: str = datadir

    # Load wells
    self.wells: list[Well] = self._load_wells(config)

    # Check if number of wells matches expected number from config
    if len(self.wells) != config.get("wells"):
        raise ValueError(
            f"Number of wells in plate '{self.name}' ({len(self.wells)}) does not match expected number ({config.get('wells')})."
        )

well(name)

Return a well object.

Parameters

name : str The name of the well. Examples: 'A01', 'B12', etc.

Returns

Well The well object.

Source code in CellViability/screening/plate.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def well(self, name: str) -> Well:
    """Return a well object.

    Parameters
    ----------
    name : str
        The name of the well. Examples: 'A01', 'B12', etc.

    Returns
    -------
    Well
        The well object.
    """
    # Check if well is loaded
    if len(self.wells) == 0:
        raise ValueError("Wells not loaded. Please load wells first.")

    # Check if well index is out of bounds
    if name not in [well.name for well in self.wells]:
        raise ValueError(f"Well {name} does not exist in plate.")

    # Get index of well in wells
    idx: int = [well.name for well in self.wells].index(name)

    return self.wells[idx]

Screen

This class is used for a screen .

Attributes

datadir : str The directory where the screening data is stored. name : str The name of the screening. plates : list[Plate] The list of plates in the screening.

Source code in CellViability/screening/screen.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class Screen:
    """This class is used for a screen .

    Attributes
    ----------
    datadir : str
        The directory where the screening data is stored.
    name : str
        The name of the screening.
    plates : list[Plate]
        The list of plates in the screening.
    """

    def __init__(self, config: dict, name: str):
        """
        Initialize the Screen object.

        Parameters
        ----------
        config : dict
            The configuration dictionary.
        name : str
            The name of the screening.

        Raises
        ------
        ValueError
            If the data directory is not found in the configuration.
        ValueError
            If the data directory does not exist.
        """
        # Get screen name
        self.name: str = name

        # Get data directory from the configuration
        datadir: str | None = config.get("datadir")
        if datadir is None:
            raise ValueError("Data directory not found in the configuration.")
        elif not os.path.exists(datadir):
            raise ValueError(f"Data directory '{datadir}' does not exist.")
        else:
            self.datadir: str = datadir

        # Load plates from the experiment configuration
        self.plates: list[Plate] = self._load_plates(config)

    def __str__(self) -> str:
        return f"<CellViability.screening.Screen `{self.name}` object at {hex(id(self))}>"

    def __repr__(self) -> str:
        return f"<CellViability.screening.Screen `{self.name}` object at {hex(id(self))}>"

    def _load_plates(self, config: dict) -> list[Plate]:
        """
        Load plates from the experiment configuration.

        Parameters
        ----------
        config : dict
            The configuration dictionary.

        Returns
        -------
        list[Plate]
            The list of plates in the screening.
        """
        plates: list[Plate] = []

        platenames: list[str] = config.get("plates", [])
        for platename in platenames:
            # Get the data directory for the plate
            path: str = os.path.join(self.datadir, platename)

            # Create a Plate object and add it to the list
            plates.append(Plate(config, platename, path))

        return plates

    def plate(self, idx: int | str) -> Plate:
        """
        Return a plate object from plates attributes.

        Parameters
        ----------
        idx : int | str
            Index or name of the plate in plates attributes.
        """
        # Check if plate index is out of bounds
        if isinstance(idx, int):
            if idx < 0 or idx >= len(self.plates):
                raise ValueError(f"Plate index '{idx}' out of bounds.")
            return self.plates[idx]
        elif isinstance(idx, str):
            for plate in self.plates:
                if plate.name == idx:
                    return plate
            raise ValueError(f"Plate name '{idx}' not found.")

__init__(config, name)

Initialize the Screen object.

Parameters

config : dict The configuration dictionary. name : str The name of the screening.

Raises

ValueError If the data directory is not found in the configuration. ValueError If the data directory does not exist.

Source code in CellViability/screening/screen.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(self, config: dict, name: str):
    """
    Initialize the Screen object.

    Parameters
    ----------
    config : dict
        The configuration dictionary.
    name : str
        The name of the screening.

    Raises
    ------
    ValueError
        If the data directory is not found in the configuration.
    ValueError
        If the data directory does not exist.
    """
    # Get screen name
    self.name: str = name

    # Get data directory from the configuration
    datadir: str | None = config.get("datadir")
    if datadir is None:
        raise ValueError("Data directory not found in the configuration.")
    elif not os.path.exists(datadir):
        raise ValueError(f"Data directory '{datadir}' does not exist.")
    else:
        self.datadir: str = datadir

    # Load plates from the experiment configuration
    self.plates: list[Plate] = self._load_plates(config)

plate(idx)

Return a plate object from plates attributes.

Parameters

idx : int | str Index or name of the plate in plates attributes.

Source code in CellViability/screening/screen.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def plate(self, idx: int | str) -> Plate:
    """
    Return a plate object from plates attributes.

    Parameters
    ----------
    idx : int | str
        Index or name of the plate in plates attributes.
    """
    # Check if plate index is out of bounds
    if isinstance(idx, int):
        if idx < 0 or idx >= len(self.plates):
            raise ValueError(f"Plate index '{idx}' out of bounds.")
        return self.plates[idx]
    elif isinstance(idx, str):
        for plate in self.plates:
            if plate.name == idx:
                return plate
        raise ValueError(f"Plate name '{idx}' not found.")

Well

This class is used for a well in a plate.

Attributes

images : list[Image] The list of images in the well. name : str The name of the well.

Source code in CellViability/screening/well.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class Well:
    """This class is used for a well in a plate.

    Attributes
    ----------
    images : list[Image]
        The list of images in the well.
    name : str
        The name of the well.
    """

    def __init__(self, config: dict, name: str, filenames: list[str]):
        """
        Initialize the Well object.

        Parameters
        ----------
        config : dict
            The configuration dictionary.
        name : str
            The name of the well.
        filenames : List[str]
            List of paths to the images in the well.

        Raises
        ------
        ValueError
            If the number of images does not match the expected number from config.
        """
        # Process well name
        self.name: str = name

        # Load images
        self.images: list[Image] = self._load_images(filenames)

        # Check if number of images matches expected number from config
        if len(self.images) != config.get("fields"):
            raise ValueError(
                f"Number of images in well '{self.name}' ({len(self.images)}) does not match expected number ({config.get('fields')})."
            )

    def __str__(self) -> str:
        return f"<CellViability.screening.well.Well `{self.name}` object at {hex(id(self))}>"

    def __repr__(self) -> str:
        return f"<CellViability.screening.well.Well `{self.name}` object at {hex(id(self))}>"

    def _load_images(self, filenames: list[str]) -> list[Image]:
        """
        Load images from the well configuration.

        Parameters
        ----------
        filenames : list[str]
            List of paths to the images in the well.

        Returns
        -------
        list[Image]
            The list of images in the well.
        """
        # Load images from the well configuration
        images = []
        for filename in sorted(filenames):
            # Get metadata from filename
            # Columbus 2.4.0.104236 build format
            # Example: '001001-1-001001001.tif
            _, _, field, _ = _get_metadata_from_filename(filename)

            # Sort images by field
            images.append((field, filename))

        # Sort images by field number
        images.sort(key=lambda x: x[0])

        return [Image().lazyload(field, filename) for field, filename in images]

    def image(self, field: int) -> Image:
        """Return an image object.

        Parameters
        ----------
        field : int
            The field number. Examples: 1, 2, 3, etc.

        Returns
        -------
        Image
            The image object.

        Raises
        ------
        ValueError
            If images are not loaded or field does not exist.

        """
        # Check if well is loaded
        if len(self.images) == 0:
            raise ValueError("Images not loaded. Please load images first.")

        # Check if well index is out of bounds
        if field not in [image.field for image in self.images]:
            raise ValueError(f"Field #{field} does not exist in plate.")

        # Get index of well in wells
        idx = [image.field for image in self.images].index(field)

        return self.images[idx]

__init__(config, name, filenames)

Initialize the Well object.

Parameters

config : dict The configuration dictionary. name : str The name of the well. filenames : List[str] List of paths to the images in the well.

Raises

ValueError If the number of images does not match the expected number from config.

Source code in CellViability/screening/well.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(self, config: dict, name: str, filenames: list[str]):
    """
    Initialize the Well object.

    Parameters
    ----------
    config : dict
        The configuration dictionary.
    name : str
        The name of the well.
    filenames : List[str]
        List of paths to the images in the well.

    Raises
    ------
    ValueError
        If the number of images does not match the expected number from config.
    """
    # Process well name
    self.name: str = name

    # Load images
    self.images: list[Image] = self._load_images(filenames)

    # Check if number of images matches expected number from config
    if len(self.images) != config.get("fields"):
        raise ValueError(
            f"Number of images in well '{self.name}' ({len(self.images)}) does not match expected number ({config.get('fields')})."
        )

image(field)

Return an image object.

Parameters

field : int The field number. Examples: 1, 2, 3, etc.

Returns

Image The image object.

Raises

ValueError If images are not loaded or field does not exist.

Source code in CellViability/screening/well.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def image(self, field: int) -> Image:
    """Return an image object.

    Parameters
    ----------
    field : int
        The field number. Examples: 1, 2, 3, etc.

    Returns
    -------
    Image
        The image object.

    Raises
    ------
    ValueError
        If images are not loaded or field does not exist.

    """
    # Check if well is loaded
    if len(self.images) == 0:
        raise ValueError("Images not loaded. Please load images first.")

    # Check if well index is out of bounds
    if field not in [image.field for image in self.images]:
        raise ValueError(f"Field #{field} does not exist in plate.")

    # Get index of well in wells
    idx = [image.field for image in self.images].index(field)

    return self.images[idx]

load_config(filename='config.json')

Loads the configuration from a JSON file.

Parameters

filename : str, optional The path to the configuration file, by default "config.json".

Returns

dict The configuration dictionary.

Source code in CellViability/core/io.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def load_config(filename: str = "config.json") -> dict:
    """
    Loads the configuration from a JSON file.

    Parameters
    ----------
    filename : str, optional
        The path to the configuration file, by default "config.json".

    Returns
    -------
    dict
        The configuration dictionary.
    """
    # Check if config file exists
    if not os.path.exists(filename):
        raise FileNotFoundError(f"Configuration file '{filename}' not found.")

    # Load and return the configuration
    with open(filename) as f:
        config: dict[str, Any] = json.load(f)

    return config

plate_map(filename, data, colname, controls, q_rows=16, q_columns=24)

Plot a plate heatmap with control highlighting and save to HTML.

Parameters

filename : str Output HTML file path. data : pandas.DataFrame Input data. Must contain a 'well' column in format 'A01', 'B12', etc. colname : str Column name to use for heatmap values. controls : dict or list Control wells. Can be either: - dict with keys {'positive': [...], 'negative': [...]} - list of wells treated as negative control q_rows : int, optional Number of rows in the plate (default is 16). q_columns : int, optional Number of columns in the plate (default is 24).

Source code in CellViability/core/visualization.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def plate_map(
    filename: str, data: pandas.DataFrame, colname: str, controls: dict | list, q_rows: int = 16, q_columns: int = 24
) -> None:
    """
    Plot a plate heatmap with control highlighting and save to HTML.

    Parameters
    ----------
    filename : str
        Output HTML file path.
    data : pandas.DataFrame
        Input data. Must contain a 'well' column in format 'A01', 'B12', etc.
    colname : str
        Column name to use for heatmap values.
    controls : dict or list
        Control wells. Can be either:
        - dict with keys {'positive': [...], 'negative': [...]}
        - list of wells treated as negative control
    q_rows : int, optional
        Number of rows in the plate (default is 16).
    q_columns : int, optional
        Number of columns in the plate (default is 24).
    """
    if "well" not in data.columns:
        raise ValueError("Input data must contain a 'well' column")

    if colname not in data.columns:
        raise ValueError(f"Column '{colname}' not found in data")

    # Work on a copy to avoid mutating the caller's data
    data = data.copy()
    # Extract row and column from well
    data[["row", "column"]] = data["well"].str.extract(r"([A-Za-z]+)(\d+)")
    data["column"] = data["column"].astype(int)

    # Fill missing wells if plate is incomplete
    expected_wells = q_rows * q_columns
    if len(data) != expected_wells:
        rows = list(string.ascii_uppercase[:q_rows])
        columns = list(range(1, q_columns + 1))
        full_grid = pandas.MultiIndex.from_product([rows, columns], names=["row", "column"]).to_frame(index=False)
        data = pandas.merge(full_grid, data, on=["row", "column"], how="left")

    # Pivot to plate format
    plate_matrix = data.pivot(index="row", columns="column", values=colname)

    # Prepare numeric indices for overlays
    data["row_idx"] = data["row"].apply(lambda r: string.ascii_uppercase.index(r))
    data["col_idx"] = data["column"] - 1

    # Parse controls
    positive_wells = controls.get("positive", []) if isinstance(controls, dict) else []
    negative_wells = (
        controls.get("negative", []) if isinstance(controls, dict) else controls if isinstance(controls, list) else []
    )

    data["control_type"] = None
    data.loc[data["well"].isin(positive_wells), "control_type"] = "positive"
    data.loc[data["well"].isin(negative_wells), "control_type"] = "negative"

    # Heatmap
    fig = plotly.graph_objects.Figure(
        data=plotly.graph_objects.Heatmap(
            z=plate_matrix.values,
            x=list(range(1, q_columns + 1)),
            y=list(string.ascii_uppercase[:q_rows]),  # A on top, P at bottom
            hoverinfo="text",
            text=[
                [
                    (
                        f"Well: {r}{c:02}<br>Value: {plate_matrix.loc[r, c]:.2f}"
                        if c in plate_matrix.columns and pandas.notna(plate_matrix.loc[r, c])
                        else f"Well: {r}{c:02}<br>Value: N/A"
                    )
                    for c in plate_matrix.columns
                ]
                for r in plate_matrix.index
            ],
            colorscale="bluered",
            colorbar={
                "title": colname,
                "thickness": 20,  # width of the colorbar in pixels
                "outlinewidth": 2,  # line width around the colorbar
                "outlinecolor": "black",  # line color around the colorbar
            },
            zmin=numpy.nanmin(plate_matrix.values),
            zmax=numpy.nanmax(plate_matrix.values),
        )
    )

    # Black borders for all wells
    for i in range(q_rows):
        for j in range(q_columns):
            fig.add_shape(
                type="rect",
                x0=j + 0.5,
                x1=j + 1.5,
                y0=i - 0.5,
                y1=i + 0.5,
                line={"color": "black", "width": 1},
                fillcolor="rgba(0,0,0,0)",
                xref="x",
                yref="y",
            )

    # Control overlays
    colors = {"negative": "gray", "positive": "green"}
    for _, row in data.dropna(subset=["control_type"]).iterrows():
        fig.add_shape(
            type="rect",
            x0=row["col_idx"] + 0.5,
            x1=row["col_idx"] + 1.5,
            y0=row["row_idx"] - 0.5,
            y1=row["row_idx"] + 0.5,
            line={"color": colors[row["control_type"]], "width": 4},
            fillcolor="rgba(0,0,0,0)",
            xref="x",
            yref="y",
        )

    # Remove titles and axis labels
    fig.update_layout(
        plot_bgcolor="white",
        autosize=False,
        width=max(600, q_columns * 50),  # set width based on number of columns
        height=900,
        margin={"b": 120, "l": 60, "r": 60, "t": 80},
        legend={"orientation": "h", "y": -0.01},
        title={"text": data["plate"].iat[0] if len(data) > 0 and "plate" in data.columns else "Plate", "x": 0.5},
    )
    fig.update_xaxes(
        tickmode="array",
        tickvals=list(range(1, q_columns + 1)),
        ticktext=[str(i) for i in range(1, q_columns + 1)],  # column index
        showticklabels=True,
        side="top",
    )
    fig.update_yaxes(
        autorange="reversed",
        scaleanchor="x",
        scaleratio=1,
        tickmode="array",
        tickvals=list(string.ascii_uppercase[:q_rows]),
        ticktext=list(string.ascii_uppercase[:q_rows]),  # show row letters
        showticklabels=True,
    )

    # Legend traces for controls below heatmap
    legend_items = []
    if negative_wells:
        legend_items.append(
            plotly.graph_objects.Scatter(
                x=[None],
                y=[None],
                mode="markers",
                marker={"color": "white", "symbol": "square", "size": 14, "line": {"color": "gray", "width": 4}},
                name="Negative Control",
            )
        )
    if positive_wells:
        legend_items.append(
            plotly.graph_objects.Scatter(
                x=[None],
                y=[None],
                mode="markers",
                marker={"color": "white", "symbol": "square", "size": 14, "line": {"color": "green", "width": 4}},
                name="Positive Control",
            )
        )

    for trace in legend_items:
        fig.add_trace(trace)

    fig.write_html(filename)