Skip to content

regression

bootstrap_ridge(Rstim, Rresp, Pstim, Presp, alphas, nboots, chunklen, nchunks, dtype=np.single, corrmin=0.2, joined=None, singcutoff=1e-10, normalpha=False, single_alpha=False, use_corr=True, logger=get_logger('ridge_corr'))

Uses ridge regression with a bootstrapped held-out set to get optimal alpha values for each response. [nchunks] random chunks of length [chunklen] will be taken from [Rstim] and [Rresp] for each regression run. [nboots] total regression runs will be performed. The best alpha value for each response will be averaged across the bootstraps to estimate the best alpha for that response.

If [joined] is given, it should be a list of lists where the STRFs for all the voxels in each sublist will be given the same regularization parameter (the one that is the best on average).

Parameters:

Name Type Description Default

Rstim

(array_like, shape(TR, N))

Training stimuli with TR time points and N features. Each feature should be Z-scored across time.

required

Rresp

(array_like, shape(TR, M))

Training responses with TR time points and M different responses (voxels, neurons, what-have-you). Each response should be Z-scored across time.

required

Pstim

(array_like, shape(TP, N))

Test stimuli with TP time points and N features. Each feature should be Z-scored across time.

required

Presp

(array_like, shape(TP, M))

Test responses with TP time points and M different responses. Each response should be Z-scored across time.

required

alphas

(list or array_like, shape(A))

Ridge parameters that will be tested. Should probably be log-spaced. np.logspace(0, 3, 20) works well.

required

nboots

int

The number of bootstrap samples to run. 15 to 30 works well.

required

chunklen

int

On each sample, the training data is broken into chunks of this length. This should be a few times longer than your delay/STRF. e.g. for a STRF with 3 delays, I use chunks of length 10.

required

nchunks

int

The number of training chunks held out to test ridge parameters for each bootstrap sample. The product of nchunks and chunklen is the total number of training samples held out for each sample, and this product should be about 20 percent of the total length of the training data.

required

dtype

dtype

All data will be cast as this dtype for computation. np.single is used by default for memory efficiency, as using np.double will thrash most machines on a big problem. If you want to do regression on complex variables, this should be changed to np.complex128.

single

corrmin

float in [0..1]

Purely for display purposes. After each alpha is tested for each bootstrap sample, the number of responses with correlation greater than this value will be printed. For long-running regressions this can give a rough sense of how well the model works before it's done.

0.2

joined

None or list of array_like indices

If you want the STRFs for two (or more) responses to be directly comparable, you need to ensure that the regularization parameter that they use is the same. To do that, supply a list of the response sets that should use the same ridge parameter here. For example, if you have four responses, joined could be [np.array([0,1]), np.array([2,3])], in which case responses 0 and 1 will use the same ridge parameter (which will be parameter that is best on average for those two), and likewise for responses 2 and 3.

None

singcutoff

float

The first step in ridge regression is computing the singular value decomposition (SVD) of the stimulus Rstim. If Rstim is not full rank, some singular values will be approximately equal to zero and the corresponding singular vectors will be noise. These singular values/vectors should be removed both for speed (the fewer multiplications the better!) and accuracy. Any singular values less than singcutoff will be removed.

1e-10

normalpha

boolean

Whether ridge parameters (alphas) should be normalized by the Frobenius norm of Rstim. Good for rigorously comparing models with different numbers of parameters.

False

single_alpha

boolean

Whether to use a single alpha for all responses. Good foridentification/decoding.

False

use_corr

boolean

If True, this function will use correlation as its metric of model fit. If False, this function will instead use variance explained (R-squared) as its metric of model fit. For ridge regression this can make a big difference -- highly regularized solutions will have very small norms and will thus explain very little variance while still leading to high correlations, as correlation is scale-free while R**2 is not.

True

Returns:

Name Type Description
wt (array_like, shape(N, M))

Regression weights for N features and M responses.

corrs (array_like, shape(M))

Validation set correlations. Predicted responses for the validation set are obtained using the regression weights: pred = np.dot(Pstim, wt), and then the correlation between each predicted response and each column in Presp is found.

alphas (array_like, shape(M))

The regularization coefficient (alpha) selected for each voxel using bootstrap cross-validation.

bootstrap_corrs (array_like, shape(A, M, B))

Correlation between predicted and actual responses on randomly held out portions of the training set, for each of A alphas, M voxels, and B bootstrap samples.

valinds (array_like, shape(TH, B))

The indices of the training data that were used as "validation" for each bootstrap sample.

Source code in src/encoders/regression.py
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
def bootstrap_ridge(
    Rstim,
    Rresp,
    Pstim,
    Presp,
    alphas,
    nboots,
    chunklen,
    nchunks,
    dtype=np.single,
    corrmin=0.2,
    joined=None,
    singcutoff=1e-10,
    normalpha=False,
    single_alpha=False,
    use_corr=True,
    logger=get_logger("ridge_corr"),
):
    """Uses ridge regression with a bootstrapped held-out set to get optimal alpha
    values for each response. [nchunks] random chunks of length [chunklen] will be taken
    from [Rstim] and [Rresp] for each regression run.  [nboots] total regression runs
    will be performed.  The best alpha value for each response will be averaged across
    the bootstraps to estimate the best alpha for that response.

    If [joined] is given, it should be a list of lists where the STRFs for all the
    voxels in each sublist will be given the same regularization parameter (the one
    that is the best on average).

    Parameters
    ----------
    Rstim : array_like, shape (TR, N)
        Training stimuli with TR time points and N features. Each feature should be
        Z-scored across time.
    Rresp : array_like, shape (TR, M)
        Training responses with TR time points and M different responses (voxels,
        neurons, what-have-you). Each response should be Z-scored across time.
    Pstim : array_like, shape (TP, N)
        Test stimuli with TP time points and N features. Each feature should be
        Z-scored across time.
    Presp : array_like, shape (TP, M)
        Test responses with TP time points and M different responses. Each response
        should be Z-scored across
        time.
    alphas : list or array_like, shape (A,)
        Ridge parameters that will be tested. Should probably be log-spaced.
        np.logspace(0, 3, 20) works well.
    nboots : int
        The number of bootstrap samples to run. 15 to 30 works well.
    chunklen : int
        On each sample, the training data is broken into chunks of this length. This
        should be a few times
        longer than your delay/STRF. e.g. for a STRF with 3 delays, I use chunks of
        length 10.
    nchunks : int
        The number of training chunks held out to test ridge parameters for each
        bootstrap sample. The product of nchunks and chunklen is the total number of
        training samples held out for each sample, and this product should be about 20
        percent of the total length of the training data.
    dtype : np.dtype
        All data will be cast as this dtype for computation. np.single is used by
        default for memory efficiency, as using np.double will thrash most machines on a
        big problem. If you want to do regression on complex variables, this should be
        changed to np.complex128.
    corrmin : float in [0..1]
        Purely for display purposes. After each alpha is tested for each bootstrap
        sample, the number of responses with correlation greater than this value will be
        printed. For long-running regressions this can give a rough sense of how well
        the model works before it's done.
    joined : None or list of array_like indices
        If you want the STRFs for two (or more) responses to be directly comparable, you
        need to ensure that the regularization parameter that they use is the same. To
        do that, supply a list of the response sets that should use the same ridge
        parameter here. For example, if you have four responses, joined could be
        [np.array([0,1]), np.array([2,3])], in which case responses 0 and 1 will use the
        same ridge parameter (which will be parameter that is best on average for those
        two), and likewise for responses 2 and 3.
    singcutoff : float
        The first step in ridge regression is computing the singular value decomposition
        (SVD) of the stimulus Rstim. If Rstim is not full rank, some singular values
        will be approximately equal to zero and the corresponding singular vectors will
        be noise. These singular values/vectors should be removed both for speed (the
        fewer multiplications the better!) and accuracy. Any singular values less than
        singcutoff will be removed.
    normalpha : boolean
        Whether ridge parameters (alphas) should be normalized by the Frobenius norm of
        Rstim. Good for rigorously comparing models with different numbers of
        parameters.
    single_alpha : boolean
        Whether to use a single alpha for all responses. Good
        foridentification/decoding.
    use_corr : boolean
        If True, this function will use correlation as its metric of model fit. If
        False, this function will instead use variance explained (R-squared) as its
        metric of model fit. For ridge regression this can make a big difference --
        highly regularized solutions will have very small norms and will thus explain
        very little variance while still leading to high correlations, as correlation
        is scale-free while R**2 is not.

    Returns
    -------
    wt : array_like, shape (N, M)
        Regression weights for N features and M responses.
    corrs : array_like, shape (M,)
        Validation set correlations. Predicted responses for the validation set are
        obtained using the regression weights: pred = np.dot(Pstim, wt), and then the
        correlation between each predicted response and each column in Presp is found.
    alphas : array_like, shape (M,)
        The regularization coefficient (alpha) selected for each voxel using bootstrap
        cross-validation.
    bootstrap_corrs : array_like, shape (A, M, B)
        Correlation between predicted and actual responses on randomly held out portions
        of the training set, for each of A alphas, M voxels, and B bootstrap samples.
    valinds : array_like, shape (TH, B)
        The indices of the training data that were used as "validation" for each
        bootstrap sample.
    """
    nresp, nvox = Rresp.shape
    valinds = []  ## Will hold the indices into the validation data for each bootstrap

    Rcmats = []
    for bi in counter(range(nboots), countevery=1, total=nboots):
        logger.info("Selecting held-out test set..")
        allinds = range(nresp)
        indchunks = list(zip(*[iter(allinds)] * chunklen))
        random.shuffle(indchunks)
        heldinds = list(itools.chain(*indchunks[:nchunks]))
        notheldinds = list(set(allinds) - set(heldinds))
        valinds.append(heldinds)

        RRstim = Rstim[notheldinds, :]
        PRstim = Rstim[heldinds, :]
        RRresp = Rresp[notheldinds, :]
        PRresp = Rresp[heldinds, :]

        ## Run ridge regression using this test set
        Rcmat = ridge_corr(
            RRstim,
            PRstim,
            RRresp,
            PRresp,
            alphas,
            dtype=dtype,
            corrmin=corrmin,
            singcutoff=singcutoff,
            normalpha=normalpha,
            use_corr=use_corr,
        )

        Rcmats.append(Rcmat)

    ## Find weights for each voxel
    try:
        U, S, Vh = np.linalg.svd(Rstim, full_matrices=False)
    except np.linalg.LinAlgError:
        logger.info("NORMAL SVD FAILED, trying more robust dgesvd..")
        from text.regression.svd_dgesvd import svd_dgesvd  # type: ignore

        U, S, Vh = svd_dgesvd(Rstim, full_matrices=False)

    ## Normalize alpha by the Frobenius norm
    # frob = np.sqrt((S**2).sum()) ## Frobenius!
    frob = S[0]
    # frob = S.sum()
    logger.info("Total training stimulus has Frobenius norm: %0.03f" % frob)
    if normalpha:
        nalphas = alphas * frob
    else:
        nalphas = alphas

    allRcorrs = np.dstack(Rcmats)
    if not single_alpha:
        logger.info("Finding best alpha for each response..")
        if joined is None:
            ## Find best alpha for each voxel
            meanbootcorrs = allRcorrs.mean(2)
            bestalphainds = np.argmax(meanbootcorrs, 0)
            valphas = nalphas[bestalphainds]
        else:
            ## Find best alpha for each group of voxels
            valphas = np.zeros((nvox,))
            for jl in joined:
                jcorrs = (
                    allRcorrs[:, jl, :].mean(1).mean(1)
                )  ## Mean across voxels in the set, then mean across bootstraps
                bestalpha = np.argmax(jcorrs)
                valphas[jl] = nalphas[bestalpha]
    else:
        logger.info("Finding single best alpha..")
        meanbootcorr = allRcorrs.mean(2).mean(1)
        bestalphaind = np.argmax(meanbootcorr)
        bestalpha = alphas[bestalphaind]
        valphas = np.array([bestalpha] * nvox)
        logger.info("Best alpha = %0.3f" % bestalpha)

    logger.info("Computing weights for each response using entire training set..")
    UR = np.dot(U.T, np.nan_to_num(Rresp))
    pred = np.zeros(Presp.shape)
    wt = np.zeros((Rstim.shape[1], Rresp.shape[1]))
    for ai, alpha in enumerate(nalphas):
        selvox = np.nonzero(valphas == alpha)[0]
        awt = reduce(np.dot, [Vh.T, np.diag(S / (S**2 + alpha**2)), UR[:, selvox]])
        pred[:, selvox] = np.dot(Pstim, awt)
        wt[:, selvox] = awt

    ## Find test correlations
    nnpred = np.nan_to_num(pred)
    corrs = np.nan_to_num(
        np.array(
            [
                np.corrcoef(Presp[:, ii], nnpred[:, ii].ravel())[0, 1]
                for ii in range(Presp.shape[1])
            ]
        )
    )

    return wt, corrs, valphas, allRcorrs, valinds

crossval_simple(feature, stories, n_train_stories, test_story, n_repeats, subject, tr_len, ndelays, interpolation, ridge_implementation, use_cache, shuffle, seed, keep_train_stories_in_mem, alphas, nboots, chunklen, nchunks, singcutoff, single_alpha, use_corr)

Run regression for n_repeats and return results.

Parameters:

Name Type Description Default

feature

(envelope, eng1000)

Which feature to use for the regression. "envelope": audio envelope that participants heard. "eng1000": embeddings computed from the words that participants heard.)

"envelope"

stories

Union[list[str], None]

Pool of stories, the default is determined by the config.yaml

required

n_train_stories

Optional[int]

The amount of training stories sampled from stories, if this is None will use all except one story.

required

test_story

str or `None`

The story on which the regression models will be tested. If None, the test story will be randomly selected from the pool of stories.

`None`

n_repeats

int

If cross_validation="simple", determines how often regression is repeated on a different train/test set.

5

subject

(UTS01, UTS02, UTS03, UTS04, UTS05, UTS06, UTS07, UTS08)

default="UTS02" Subject identifier

"UTS01"

tr_len

float

Length of tr-windows used to sample fMRI data.

required

ndelays

int

How many delays are used to model the HRF, which is modeled by adding a shifted set of duplicated features for each delay. ndelays=5 implies that the the features of the stimulus are shifted concatinated 5 times to training/testing data.

required

interpolation

(lanczos, average)

Whether to use lanczos interpolation or just average the words within a TR. Only applies to the 'eng1000' feature.

"lanczos"

ridge_implementation

(ridgeCV, ridge_huth)

Which implementation of ridge regression to use. "ridgeCV" uses the RidgeCV function from sklearn. "ridge_huth" uses the implementation from the Huth lab codebase which applies SVD to the data matrix and computes correlation scores with bootstrapping.

"ridgeCV"

alphas

ndarray

Array of alpha values to optimize over.

required

use_cache

bool

Whether the cache is used for envelope features.

required

shuffle

bool

Whether to shuffle the predictors (features).

required

seed

Optional[int]

Seed determining sampling of stories.

required

keep_train_stories_in_mem

bool

Whether stories are kept in memory after first loading. Unless when using all stories turning this off will reduce the memory footprint, but increase the time is spent loading data.

required

Returns:

Name Type Description
mean_scores ndarray

The mean prediction scores for each repeat.

scores_list list of np.ndarray

The scores for each repeat.

weights list of np.ndarray

The regression weights for each repeat.

alphas list of float or list of np.ndarray

The best alphas for each voxel, for each repeat.

Source code in src/encoders/regression.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def crossval_simple(
    feature: str,
    stories: Union[list[str], None],
    n_train_stories: Optional[int],
    test_story: Optional[str],
    n_repeats: int,
    subject: str,
    tr_len: float,
    ndelays: int,
    interpolation: str,
    ridge_implementation: str,
    use_cache: bool,
    shuffle: bool,
    seed: Optional[int],
    keep_train_stories_in_mem: bool,
    alphas: np.ndarray,
    nboots: int,
    chunklen: int,
    nchunks: int,
    singcutoff: float,
    single_alpha: bool,
    use_corr: bool,
) -> tuple[np.ndarray, list[np.ndarray], list[np.ndarray]]:
    """Run regression for n_repeats and return results.

    Parameters
    ----------
    feature : {"envelope", "eng1000"}
        Which feature to use for the regression. "envelope": audio envelope that
        participants heard. "eng1000": embeddings computed from the words that
        participants heard.)
    stories: list[str], default=STORIES
        Pool of stories, the default is determined by the config.yaml
    n_train_stories: int, optional
        The amount of training stories sampled from `stories`, if this is `None` will
        use all except one story.
    test_story : str or `None`, default=`None`
        The story on which the regression models will be tested.
        If `None`, the test story will be randomly selected from the pool of stories.
    n_repeats : int, default=5
        If `cross_validation="simple"`, determines how often regression is
        repeated on a different train/test set.
    subject : {"UTS01", "UTS02", "UTS03", "UTS04", "UTS05", "UTS06", "UTS07", "UTS08"},
        default="UTS02"
        Subject identifier
    tr_len: float, default=2.0
        Length of tr-windows used to sample fMRI data.
    ndelays: int, default=4
        How many delays are used to model the HRF, which is modeled by adding
        a shifted set of duplicated features for each delay. `ndelays=5` implies
        that the the features of the stimulus are shifted concatinated 5 times
        to training/testing data.
    interpolation : {"lanczos", "average"}
        Whether to use lanczos interpolation or just average the words within a TR.
        Only applies to the 'eng1000' feature.
    ridge_implementation : {"ridgeCV", "ridge_huth"}, default="ridge_huth"
        Which implementation of ridge regression to use. "ridgeCV" uses the RidgeCV
        function from sklearn.
        "ridge_huth" uses the implementation from the Huth lab codebase which applies
        SVD to the data matrix and computes correlation scores with bootstrapping.
    alphas : np.ndarray
        Array of alpha values to optimize over.
    use_cache: bool
        Whether the cache is used for `envelope` features.
    shuffle: bool
        Whether to shuffle the predictors (features).
    seed: int | None, default=123
        Seed determining sampling of stories.
    keep_train_stories_in_mem: bool
        Whether stories are kept in memory after first loading. Unless when using all
        stories turning this off will reduce the memory footprint, but increase the
        time is spent loading data.

    Returns
    -------
    mean_scores : np.ndarray
        The mean prediction scores for each repeat.
    scores_list : list of np.ndarray
        The scores for each repeat.
    weights : list of np.ndarray
        The regression weights for each repeat.
    alphas : list of float or list of np.ndarray
        The best alphas for each voxel, for each repeat.
    """

    if stories is None:
        stories = STORIES
        if not isinstance(stories, list):
            raise ValueError(f"Config parameter invalid: STORIES: {stories}")
    stories = stories.copy()

    if n_train_stories is None:
        n_train_stories = len(stories) - 1

    if test_story is not None and test_story in stories:
        stories.remove(test_story)

    rng = np.random.default_rng(seed=seed)

    # data dicts
    X_data_dict = dict()
    y_data_dict = dict()
    # result arrays
    scores_list = list()
    weights_list = list()
    best_alphas_list = list()
    for repeat in range(n_repeats):
        log.info(f"Repeat {repeat}")

        # 1. choose stories to sample for this repeat
        if test_story is None:
            curr_all_stories = cast(
                list[str],
                rng.choice(stories, size=n_train_stories + 1, replace=False).tolist(),
            )
            curr_train_stories = curr_all_stories[:-1]
            curr_test_stories = curr_all_stories[-1:]
        else:
            curr_train_stories = cast(
                list[str],
                rng.choice(stories, size=n_train_stories, replace=False).tolist(),
            )
            curr_test_stories = [test_story]
            curr_all_stories = [*curr_train_stories, *curr_test_stories]

        # 2. load data for stories
        stories_to_load = list(
            set(curr_all_stories).difference(set(X_data_dict.keys()))
        )
        if len(stories_to_load) > 0:
            log.info("Loading data")
            X_data_dict_new, y_data_dict_new = load_data_dict(
                stories_to_load,
                subject,
                feature,
                interpolation,
                shuffle,
                tr_len,
                ndelays,
                use_cache,
            )
            X_data_dict.update(X_data_dict_new)
            y_data_dict.update(y_data_dict_new)

        # 3. run regression
        log.info(
            f"Running regression | n_train_stories: {n_train_stories}"
            + f" | implementation: {ridge_implementation}"
        )
        if ridge_implementation == "ridge_huth":
            scores, weights, best_alphas = ridge_regression_huth(
                train_stories=curr_train_stories,
                test_stories=curr_test_stories,
                X_data_dict=X_data_dict,
                y_data_dict=y_data_dict,
                score_fct=pearsonr,  # type: ignore
                alphas=alphas,
                nboots=nboots,
                chunklen=chunklen,
                nchunks=nchunks,
                singcutoff=singcutoff,
                single_alpha=single_alpha,
                use_corr=use_corr,
            )
        else:
            scores, weights, best_alphas = ridge_regression(
                train_stories=curr_train_stories,
                test_stories=curr_test_stories,
                X_data_dict=X_data_dict,
                y_data_dict=y_data_dict,
                score_fct=pearsonr,  # type: ignore
            )
        log.info(f"Mean corr: {scores.mean()}")
        log.info(f"Max corr : {scores.max()}")

        if not keep_train_stories_in_mem:
            del X_data_dict
            del y_data_dict
            X_data_dict = dict()
            y_data_dict = dict()

        # 4. append results
        scores_list.append(scores)
        weights_list.append(weights)
        best_alphas_list.append(best_alphas)

    return np.array(scores_list), weights_list, best_alphas_list

mult_diag(d, mtx, left=True)

Multiply a full matrix by a diagonal matrix. This function should always be faster than dot.

Input: d -- 1D (N,) array (contains the diagonal elements) mtx -- 2D (N,N) array

Output: mult_diag(d, mts, left=True) == dot(diag(d), mtx) mult_diag(d, mts, left=False) == dot(mtx, diag(d))

By Pietro Berkes From http://mail.scipy.org/pipermail/numpy-discussion/2007-March/026807.html

Source code in src/encoders/regression.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def mult_diag(d, mtx, left=True):
    """Multiply a full matrix by a diagonal matrix.
    This function should always be faster than dot.

    Input:
      d -- 1D (N,) array (contains the diagonal elements)
      mtx -- 2D (N,N) array

    Output:
      mult_diag(d, mts, left=True) == dot(diag(d), mtx)
      mult_diag(d, mts, left=False) == dot(mtx, diag(d))

    By Pietro Berkes
    From http://mail.scipy.org/pipermail/numpy-discussion/2007-March/026807.html
    """
    if left:
        return (d * mtx.T).T
    else:
        return d * mtx

pearsonr(x1, x2)

Returns the pearson correlation between two vectors or two matrices of the same shape (in which case the correlation is computed for each pair of column vectors).

Parameters:

Name Type Description Default

x1

ndarray

shape = (n_samples,) or (n_samples, n_targets)

required

x2

ndarray

shape = (n_samples,) or (n_samples, n_targets), same shape as x1

required

Returns:

Name Type Description
corr float or ndarray

Pearson correlation between x1 and x2. If x1 and x2 are matrices, returns an array of correlations with shape (n_targets,)

Source code in src/encoders/regression.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def pearsonr(x1: np.ndarray, x2: np.ndarray) -> Union[float, np.ndarray]:
    """Returns the pearson correlation between two vectors or two matrices of the same
    shape (in which case the correlation is computed for each pair of column vectors).

    Parameters
    ----------
    x1 : np.ndarray
        shape = (n_samples,) or (n_samples, n_targets)
    x2 : np.ndarray
        shape = (n_samples,) or (n_samples, n_targets), same shape as x1

    Returns
    -------
    corr: float or np.ndarray
        Pearson correlation between x1 and x2. If x1 and x2 are matrices, returns an
        array of correlations with shape (n_targets,)
    """
    return np.mean(zs(x1) * zs(x2), axis=0)

pearsonr_scorer(estimator, X, y)

Scorer function for RidgeCV that computes the Pearson correlation between the predicted and true values.

Parameters:

Name Type Description Default

estimator

object

A trained scikit-learn estimator with a predict method.

required

X

ndarray

The input data used for prediction.

required

y

ndarray

The true target values.

required
Returns:

float The Pearson correlation coefficient between the predicted and true values.

Source code in src/encoders/regression.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def pearsonr_scorer(
    estimator, X: np.ndarray, y: np.ndarray
) -> Union[float, np.ndarray]:
    """Scorer function for RidgeCV that computes the Pearson correlation between the
    predicted and true values.

    Parameters
    ----------
    estimator : object
        A trained scikit-learn estimator with a `predict` method.
    X : np.ndarray
        The input data used for prediction.
    y : np.ndarray
        The true target values.

    Returns:
    --------
    float
        The Pearson correlation coefficient between the predicted and true values.
    """
    y_predict = estimator.predict(X)
    return pearsonr(y, y_predict)

ridge_corr(Rstim, Pstim, Rresp, Presp, alphas, normalpha=False, dtype=np.single, corrmin=0.2, singcutoff=1e-10, use_corr=True, logger=get_logger('ridge_corr'))

Uses ridge regression to find a linear transformation of [Rstim] that approximates [Rresp]. Then tests by comparing the transformation of [Pstim] to [Presp]. This procedure is repeated for each regularization parameter alpha in [alphas]. The correlation between each prediction and each response for each alpha is returned. Note that the regression weights are NOT returned.

Parameters:

Name Type Description Default

Rstim

(array_like, shape(TR, N))

Training stimuli with TR time points and N features. Each feature should be Z-scored across time.

required

Pstim

(array_like, shape(TP, N))

Test stimuli with TP time points and N features. Each feature should be Z-scored across time.

required

Rresp

(array_like, shape(TR, M))

Training responses with TR time points and M responses (voxels, neurons, what-have-you). Each response should be Z-scored across time.

required

Presp

(array_like, shape(TP, M))

Test responses with TP time points and M responses.

required

alphas

(list or array_like, shape(A))

Ridge parameters to be tested. Should probably be log-spaced. np.logspace(0, 3, 20) works well.

required

normalpha

boolean

Whether ridge parameters should be normalized by the Frobenius norm of Rstim. Good for comparing models with different numbers of parameters.

False

dtype

dtype

All data will be cast as this dtype for computation. np.single is used by default for memory efficiency.

single

corrmin

float in [0..1]

Purely for display purposes. After each alpha is tested, the number of responses with correlation greater than corrmin minus the number of responses with correlation less than negative corrmin will be printed. For long-running regressions this vague metric of non-centered skewness can give you a rough sense of how well the model is working before it's done.

0.2

singcutoff

float

The first step in ridge regression is computing the singular value decomposition (SVD) of the stimulus Rstim. If Rstim is not full rank, some singular values will be approximately equal to zero and the corresponding singular vectors will be noise. These singular values/vectors should be removed both for speed (the fewer multiplications the better!) and accuracy. Any singular values less than singcutoff will be removed.

1e-10

use_corr

boolean

If True, this function will use correlation as its metric of model fit. If False, this function will instead use variance explained (R-squared) as its metric of model fit. For ridge regression this can make a big difference -- highly regularized solutions will have very small norms and will thus explain very little variance while still leading to high correlations, as correlation is scale-free while R**2 is not.

True

Returns:

Name Type Description
Rcorrs (array_like, shape(A, M))

The correlation between each predicted response and each column of Presp for each alpha.

Source code in src/encoders/regression.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
def ridge_corr(
    Rstim,
    Pstim,
    Rresp,
    Presp,
    alphas,
    normalpha=False,
    dtype=np.single,
    corrmin=0.2,
    singcutoff=1e-10,
    use_corr=True,
    logger=get_logger("ridge_corr"),
):
    """Uses ridge regression to find a linear transformation of [Rstim] that
    approximates [Rresp]. Then tests by comparing the transformation of [Pstim] to
    [Presp]. This procedure is repeated for each regularization parameter alpha in
    [alphas]. The correlation between each prediction and each response for each alpha
    is returned. Note that the regression weights are NOT returned.

    Parameters
    ----------
    Rstim : array_like, shape (TR, N)
        Training stimuli with TR time points and N features. Each feature should be
        Z-scored across time.
    Pstim : array_like, shape (TP, N)
        Test stimuli with TP time points and N features. Each feature should be Z-scored
        across time.
    Rresp : array_like, shape (TR, M)
        Training responses with TR time points and M responses (voxels, neurons,
        what-have-you). Each response should be Z-scored across time.
    Presp : array_like, shape (TP, M)
        Test responses with TP time points and M responses.
    alphas : list or array_like, shape (A,)
        Ridge parameters to be tested. Should probably be log-spaced.
        np.logspace(0, 3, 20) works well.
    normalpha : boolean
        Whether ridge parameters should be normalized by the Frobenius norm of Rstim.
        Good for comparing models with different numbers of parameters.
    dtype : np.dtype
        All data will be cast as this dtype for computation. np.single is used by
        default for memory efficiency.
    corrmin : float in [0..1]
        Purely for display purposes. After each alpha is tested, the number of responses
        with correlation greater than corrmin minus the number of responses with
        correlation less than negative corrmin will be printed. For long-running
        regressions this vague metric of non-centered skewness can give you a rough
        sense of how well the model is working before it's done.
    singcutoff : float
        The first step in ridge regression is computing the singular value decomposition
        (SVD) of the stimulus Rstim. If Rstim is not full rank, some singular values
        will be approximately equal to zero and the corresponding singular vectors will
        be noise. These singular values/vectors should be removed both for speed (the
        fewer multiplications the better!) and accuracy. Any singular values less than
        singcutoff will be removed.
    use_corr : boolean
        If True, this function will use correlation as its metric of model fit. If
        False, this function will instead use variance explained (R-squared) as its
        metric of model fit. For ridge regression this can make a big difference --
        highly regularized solutions will have very small norms and will thus explain
        very little variance while still leading to high correlations, as correlation
        is scale-free while R**2 is not.

    Returns
    -------
    Rcorrs : array_like, shape (A, M)
        The correlation between each predicted response and each column of Presp for
        each alpha.

    """
    ## Calculate SVD of stimulus matrix
    logger.info("Doing SVD...")
    try:
        U, S, Vh = np.linalg.svd(Rstim, full_matrices=False)
    except np.linalg.LinAlgError:
        logger.info("NORMAL SVD FAILED, trying more robust dgesvd..")
        from text.regression.svd_dgesvd import svd_dgesvd  # type: ignore

        U, S, Vh = svd_dgesvd(Rstim, full_matrices=False)

    ## Truncate tiny singular values for speed
    origsize = S.shape[0]
    ngoodS = np.sum(S > singcutoff)
    nbad = origsize - ngoodS
    U = U[:, :ngoodS]
    S = S[:ngoodS]
    Vh = Vh[:ngoodS]
    logger.info(
        "Dropped %d tiny singular values.. (U is now %s)" % (nbad, str(U.shape))
    )

    ## Normalize alpha by the Frobenius norm
    # frob = np.sqrt((S**2).sum()) ## Frobenius!
    frob = S[0]
    # frob = S.sum()
    logger.info("Training stimulus has Frobenius norm: %0.03f" % frob)
    if normalpha:
        nalphas = alphas * frob
    else:
        nalphas = alphas

    ## Precompute some products for speed
    UR = np.dot(U.T, Rresp)  ## Precompute this matrix product for speed
    PVh = np.dot(Pstim, Vh.T)  ## Precompute this matrix product for speed

    # Prespnorms = np.apply_along_axis(np.linalg.norm, 0, Presp) ## Precompute test
    # response norms
    zPresp = zs(Presp)
    Prespvar = Presp.var(0)
    Rcorrs = []  ## Holds training correlations for each alpha
    for na, a in zip(nalphas, alphas):
        # D = np.diag(S/(S**2+a**2)) ## Reweight singular vectors by the ridge parameter
        D = S / (
            S**2 + na**2
        )  ## Reweight singular vectors by the (normalized?) ridge parameter

        pred = np.dot(
            mult_diag(D, PVh, left=False), UR
        )  ## Best (1.75 seconds to prediction in test)
        # pred = np.dot(mult_diag(D, np.dot(Pstim, Vh.T), left=False), UR) ## Better
        # (2.0 seconds to prediction in test)

        # pvhd = reduce(np.dot, [Pstim, Vh.T, D]) ## Pretty good (2.4 seconds to
        # prediction in test)
        # pred = np.dot(pvhd, UR)

        # wt = reduce(np.dot, [Vh.T, D, UR]).astype(dtype) ## Bad (14.2 seconds to
        # prediction in test)
        # wt = reduce(np.dot, [Vh.T, D, U.T, Rresp]).astype(dtype) ## Worst
        # pred = np.dot(Pstim, wt) ## Predict test responses

        if use_corr:
            # prednorms = np.apply_along_axis(np.linalg.norm, 0, pred) ## Compute
            # predicted test response norms
            # Rcorr = np.array(
            #    [np.corrcoef(Presp[:,ii], pred[:,ii].ravel())[0,1]
            #     for ii in range(Presp.shape[1])]) ## Slowly compute correlations
            # Rcorr = np.array(np.sum(np.multiply(Presp, pred), 0)).squeeze()/
            #            (prednorms*Prespnorms) ## Efficiently compute correlations
            Rcorr = (zPresp * zs(pred)).mean(0)
        else:
            ## Compute variance explained
            resvar = (Presp - pred).var(0)
            Rcorr = np.clip(1 - (resvar / Prespvar), 0, 1)

        Rcorr[np.isnan(Rcorr)] = 0
        Rcorrs.append(Rcorr)

        log_template = (
            "Training: alpha=%0.3f, mean corr=%0.5f,"
            " max corr=%0.5f, over-under(%0.2f)=%d"
        )
        log_msg = log_template % (
            a,
            np.mean(Rcorr),
            np.max(Rcorr),
            corrmin,
            (Rcorr > corrmin).sum() - (-Rcorr > corrmin).sum(),
        )
        if logger is not None:
            logger.info(log_msg)
        else:
            print(log_msg)

    return Rcorrs

ridge_regression(train_stories, test_stories, X_data_dict, y_data_dict, score_fct, alphas=np.logspace(1, 3, 10))

Runs CVRidgeRegression on given train stories, returns scores for test story, regression weights, and the best alphas.

Parameters:

Name Type Description Default

train_stories

list[str]

List of training stories. Stories must be in X_data_dict.

required

test_stories

list[str]

List of testing stories. Stories must be in X_data_dict.

required

X_data_dict

dict[str, ndarray]

Dict with story to X_data (features) pairs.

required

y_data_dict

dict[str, ndarray]

Dict with story to y_data (fMRI data) pairs.

required

score_fct

fct(np.ndarray, np.ndarray) -> np.ndarray

A function taking y_test (shape = (number_trs, n_voxels)) and y_predict (same shape as y_test) and returning an array with an entry for each voxel (shape = (n_voxels)).

required

alphas

np.ndarray or `None`

Array of alpha values to optimize over. If None, will choose default value.

= `np.logspace(1, 3, 10)`

Returns:

Name Type Description
scores ndarray

The prediction scores for the test stories.

weights ndarray

The regression weights.

best_alphas float | ndarray

The best alphas for each voxel.

Source code in src/encoders/regression.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def ridge_regression(
    train_stories: list[str],
    test_stories: list[str],
    X_data_dict: dict[str, np.ndarray],
    y_data_dict: dict[str, np.ndarray],
    score_fct: Callable[[np.ndarray, np.ndarray], np.ndarray],
    alphas: Optional[np.ndarray] = np.logspace(1, 3, 10),
) -> tuple[np.ndarray, np.ndarray, Union[float, np.ndarray]]:
    """Runs CVRidgeRegression on given train stories, returns scores for
    test story, regression weights, and the best alphas.

    Parameters
    ----------
    train_stories: list of str
        List of training stories. Stories must be in X_data_dict.
    test_stories: list of str
        List of testing stories. Stories must be in X_data_dict.
    X_data_dict : dict[str, np.ndarray]
        Dict with story to X_data (features) pairs.
    y_data_dict : dict[str, np.ndarray]
        Dict with story to y_data (fMRI data) pairs.
    score_fct : fct(np.ndarray, np.ndarray) -> np.ndarray
        A function taking y_test (shape = (number_trs, n_voxels))
        and y_predict (same shape as y_test) and returning an
        array with an entry for each voxel (shape = (n_voxels)).
    alphas : np.ndarray or `None`, default = `np.logspace(1, 3, 10)`
        Array of alpha values to optimize over. If `None`, will choose
        default value.

    Returns
    -------
    scores : np.ndarray
        The prediction scores for the test stories.
    weights : np.ndarray
        The regression weights.
    best_alphas : float | np.ndarray
        The best alphas for each voxel.
    """
    if alphas is None:
        alphas = np.logspace(1, 3, 10)

    X_train_list = [X_data_dict[story] for story in train_stories]
    y_train_list = [y_data_dict[story] for story in train_stories]
    X_test_list = [X_data_dict[story] for story in test_stories]
    y_test_list = [y_data_dict[story] for story in test_stories]

    X_train_unnormalized = np.concatenate(X_train_list, axis=0)
    y_train = np.concatenate(y_train_list, axis=0)
    X_test_unnormalized = np.concatenate(X_test_list, axis=0)
    y_test = np.concatenate(y_test_list, axis=0)

    X_means = X_train_unnormalized.mean(axis=0)
    X_stds = X_train_unnormalized.std(axis=0)

    X_train = z_score(X_train_unnormalized, X_means, X_stds)
    X_test = z_score(X_test_unnormalized, X_means, X_stds)

    clf = RidgeCV(alphas=alphas, alpha_per_target=True, scoring=pearsonr_scorer)
    clf.fit(X_train, y_train)

    y_predict = clf.predict(X_test)
    scores = score_fct(y_test, y_predict)

    return scores, clf.coef_, clf.alpha_

ridge_regression_huth(train_stories, test_stories, X_data_dict, y_data_dict, score_fct, alphas, nboots, chunklen, nchunks, singcutoff, single_alpha, use_corr)

Runs CVRidgeRegression on given train stories, returns scores for test story, regression weights, and the best alphas. Instead of using RidgeCV from sklearn, uses the Huth lab's implementation (https://github.com/HuthLab/deep-fMRI-dataset/blob/master/encoding/ridge_utils/ridge.py).

Parameters:

Name Type Description Default

train_stories

list[str]

List of training stories. Stories must be in X_data_dict.

required

test_stories

list[str]

List of testing stories. Stories must be in X_data_dict.

required

X_data_dict

dict[str, ndarray]

Dict with story to X_data (features) pairs.

required

y_data_dict

dict[str, ndarray]

Dict with story to y_data (fMRI data) pairs.

required

score_fct

fct(np.ndarray, np.ndarray) -> np.ndarray

A function taking y_test (shape = (number_trs, n_voxels)) and y_predict (same shape as y_test) and returning an array with an entry for each voxel (shape = (n_voxels)).

required

alphas

np.ndarray or `None`,

Array of alpha values to optimize over. If None, will choose default value.

required

Returns:

Name Type Description
scores ndarray

The prediction scores for the test stories.

weights ndarray

The regression weights.

best_alphas float | ndarray

The best alphas for each voxel.

Source code in src/encoders/regression.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def ridge_regression_huth(
    train_stories: list[str],
    test_stories: list[str],
    X_data_dict: dict[str, np.ndarray],
    y_data_dict: dict[str, np.ndarray],
    score_fct: Callable[[np.ndarray, np.ndarray], np.ndarray],
    alphas: np.ndarray,
    nboots: int,
    chunklen: int,
    nchunks: int,
    singcutoff: float,
    single_alpha: bool,
    use_corr: bool,
) -> tuple[np.ndarray, np.ndarray, Union[float, np.ndarray]]:
    """Runs CVRidgeRegression on given train stories, returns scores for
    test story, regression weights, and the best alphas. Instead of using RidgeCV from
    sklearn, uses the Huth lab's implementation
    (https://github.com/HuthLab/deep-fMRI-dataset/blob/master/encoding/ridge_utils/ridge.py).

    Parameters
    ----------
    train_stories: list of str
        List of training stories. Stories must be in X_data_dict.
    test_stories: list of str
        List of testing stories. Stories must be in X_data_dict.
    X_data_dict : dict[str, np.ndarray]
        Dict with story to X_data (features) pairs.
    y_data_dict : dict[str, np.ndarray]
        Dict with story to y_data (fMRI data) pairs.
    score_fct : fct(np.ndarray, np.ndarray) -> np.ndarray
        A function taking y_test (shape = (number_trs, n_voxels))
        and y_predict (same shape as y_test) and returning an
        array with an entry for each voxel (shape = (n_voxels)).
    alphas : np.ndarray or `None`,
        Array of alpha values to optimize over. If `None`, will choose
        default value.

    Returns
    -------
    scores : np.ndarray
        The prediction scores for the test stories.
    weights : np.ndarray
        The regression weights.
    best_alphas : float | np.ndarray
        The best alphas for each voxel.
    """

    X_train_list = [X_data_dict[story] for story in train_stories]
    y_train_list = [y_data_dict[story] for story in train_stories]
    X_test_list = [X_data_dict[story] for story in test_stories]
    y_test_list = [y_data_dict[story] for story in test_stories]

    X_train_unnormalized = np.concatenate(X_train_list, axis=0)
    y_train = np.concatenate(y_train_list, axis=0)
    X_test_unnormalized = np.concatenate(X_test_list, axis=0)
    y_test = np.concatenate(y_test_list, axis=0)

    X_means = X_train_unnormalized.mean(axis=0)
    X_stds = X_train_unnormalized.std(axis=0)

    X_train = z_score(X_train_unnormalized, X_means, X_stds)
    X_test = z_score(X_test_unnormalized, X_means, X_stds)

    wt, _, bestalphas, _, _ = bootstrap_ridge(
        X_train,
        y_train,
        X_test,
        y_test,
        alphas,
        nboots=nboots,
        chunklen=chunklen,
        nchunks=nchunks,
        singcutoff=singcutoff,
        single_alpha=single_alpha,
        use_corr=use_corr,
    )
    scores = score_fct(np.dot(X_test, wt), y_test)

    return scores, wt, bestalphas

z_score(data, means, stds)

Return data z-scored by given means and standard deviations stds. Useful to z-score after train/test splits with the same mean in cross-validation settings to prevent data leakage.

Parameters:

Name Type Description Default

data

ndarray

shape = (n_samples, n_features) or (n_samples,)

required

means

ndarray

shape = (n_features,) or (1, n_features)

required

stds

ndarray

shape = (n_features,) or (1, n_features)

required

Returns:

Type Description
ndarray

Normalized data, shape = (n_samples, n_features) or (1, n_features)

Source code in src/encoders/regression.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def z_score(data: np.ndarray, means: np.ndarray, stds: np.ndarray) -> np.ndarray:
    """
    Return `data` z-scored by given `means` and standard deviations `stds`. Useful
    to z-score after train/test splits with the same mean in cross-validation settings
    to prevent data leakage.

    Parameters
    ----------
    data : np.ndarray
        shape = (n_samples, n_features) or (n_samples,)
    means : np.ndarray
        shape = (n_features,) or (1, n_features)
    stds : np.ndarray
        shape = (n_features,) or (1, n_features)

    Returns
    -------
    np.ndarray
       Normalized data, shape = (n_samples, n_features) or (1, n_features)

    """
    return (data - means) / (stds + 1e-6)

zs(x)

Returns the z-score of the input array. Z-scores along the first dimension for n-dimensional arrays by default.

Parameters:

Name Type Description Default

x

ndarray

Input array. Can be 1D or n-dimensional.

required

Returns:

Name Type Description
z ndarray

Z-score of x.

Source code in src/encoders/regression.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def zs(x: np.ndarray) -> np.ndarray:
    """Returns the z-score of the input array. Z-scores along the first dimension for
    n-dimensional arrays by default.

    Parameters
    ----------
    x : np.ndarray
        Input array. Can be 1D or n-dimensional.

    Returns
    -------
    z : np.ndarray
        Z-score of x.
    """
    return (x - x.mean(axis=0)) / (x.std(axis=0) + 1e-6)