Skip to content

features

get_embeddings(story)

Load embeddings, vocabulary and word onset/offset times from the textgrid file.

Source code in src/encoders/features.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def get_embeddings(story: str) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Load embeddings, vocabulary and word onset/offset times from the textgrid file.
    """
    vecs, vocab = load_embeddings()

    word_grid = load_textgrid(story)["word"]

    tokens = [
        row.text.lower()
        for _, row in word_grid.iterrows()
        if row.text not in SKIP_TOKENS
    ]
    starts = np.array(
        [row.start for _, row in word_grid.iterrows() if row.text not in SKIP_TOKENS]
    )
    stops = np.array(
        [row.stop for _, row in word_grid.iterrows() if row.text not in SKIP_TOKENS]
    )

    exist_tokens = [t for t in tokens if t in vocab]

    log.info(
        f"{len(exist_tokens)}/{len(tokens)}"
        + f" (missing {len(tokens) - len(exist_tokens)}) "
        + "story tokens found in vocab."
    )

    embs = np.array(
        [vecs[:, vocab[t]] if t in vocab else np.zeros(vecs.shape[0]) for t in tokens]
    )

    return embs, starts, stops

get_envelope(signal)

Compute the audio envelope

Source code in src/encoders/features.py
18
19
20
21
def get_envelope(signal: np.ndarray) -> np.ndarray:
    """Compute the audio envelope"""
    log.info("Computing envelope.")
    return np.abs(hilbert(signal))  # type: ignore

lanczosfun(f_c, t, a=3)

Lanczos function with cutoff frequency f_c.

Parameters:

Name Type Description Default

f_c

float

Cutoff frequency

required

t

ndarray or float

Time

required

a

int

Number of lobes (window size), typically 2 or 3; only signals within the window will have non-zero weights.

3

Returns:

Type Description
ndarray or float

Lanczos function with cutoff frequency f_c

Source code in src/encoders/features.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def lanczosfun(f_c, t, a=3):
    """
    Lanczos function with cutoff frequency f_c.

    Parameters
    -----------
    f_c : float
        Cutoff frequency
    t : np.ndarray or float
        Time
    a : int
        Number of lobes (window size), typically 2 or 3; only signals within the window
        will have non-zero weights.

    Returns
    --------
    np.ndarray or float
        Lanczos function with cutoff frequency f_c
    """
    val = sinc(f_c, t) * sinc(f_c, t / a)
    val[t == 0] = 1.0
    val[np.abs(t * f_c) > a] = 0.0

    return val

lanczosinterp2D(signal, oldtime, newtime, window=3, cutoff_mult=1.0)

Lanczos interpolation for 2D signals; interpolates [signal] from [oldtime] to [newtime], assuming that the rows of [signal] correspond to [oldtime]. Returns a new signal with rows corresponding to [newtime] and the same number of columns as [signal].

Parameters:

Name Type Description Default

signal

ndarray

2-D array of shape (n_samples, n_features)

required

oldtime

ndarray

1-D array of old time points

required

newtime

ndarray

1-D array of new time points

required

window

int

Number of lobes (window size) for the Lanczos function

3

cutoff_mult

float

Multiplier for the cutoff frequency

1.0

Returns:

Type Description
ndarray

2-D array of shape (len(newtime), n_features)

Source code in src/encoders/features.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def lanczosinterp2D(signal, oldtime, newtime, window=3, cutoff_mult=1.0):
    """
    Lanczos interpolation for 2D signals; interpolates [signal] from [oldtime] to
    [newtime], assuming that the rows of [signal] correspond to [oldtime]. Returns a
    new signal with rows corresponding to [newtime] and the same number of columns as
    [signal].

    Parameters
    -----------
    signal : np.ndarray
        2-D array of shape (n_samples, n_features)
    oldtime : np.ndarray
        1-D array of old time points
    newtime : np.ndarray
        1-D array of new time points
    window : int
        Number of lobes (window size) for the Lanczos function
    cutoff_mult : float
        Multiplier for the cutoff frequency

    Returns
    --------
    np.ndarray
        2-D array of shape (len(newtime), n_features)
    """
    # Find the cutoff frequency
    f_c = 1 / (np.max(np.abs(np.diff(newtime)))) * cutoff_mult
    # Build the Lanczos interpolation matrix
    interp_matrix = np.zeros((len(newtime), len(oldtime)))
    for i, t in enumerate(newtime):
        interp_matrix[i, :] = lanczosfun(f_c, t - oldtime, a=window)
    # Interpolate the signal
    newsignal = np.dot(interp_matrix, signal)

    return newsignal

load_embeddings()

Load the embedding vectors and vocabulary from the EMBEDDINGS_FILE (h5py).

Source code in src/encoders/features.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def load_embeddings() -> Tuple[np.ndarray, Dict]:
    """
    Load the embedding vectors and vocabulary from the EMBEDDINGS_FILE (h5py).
    """
    with h5py.File(EMBEDDINGS_FILE, "r") as f:
        # List all groups
        log.info(f"Loading: {EMBEDDINGS_FILE}")

        # Get the data
        data = np.array(f["data"])
        vocab = {e.decode("utf-8"): i for i, e in enumerate(np.array(f["vocab"]))}

        log.info(f"data shape: {data.shape}")
        log.info(f"vocab len: {len(vocab)}")

    return data, vocab

load_envelope_data(story, tr_len, y_data, use_cache=True)

Load .wavfile, compute envelope, trim and downsample to match the number of samples in y_data.

Parameters:

Name Type Description Default

story

str

The story for which to load/compute the envelope.

required

tr_len

float

The time-to-repeat (TR)

required

y_data

ndarray

The data to match the sampling frequency of. Uses y_data.shape[0] to determine the number downsampling rate.

required

use_cache

bool

Whether or not to save computed results to cache dir (will save to encoders.utils.load_config()['CACHE_DIR'])

True

Returns:

Type Description
ndarray

The envelope data.

Source code in src/encoders/features.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def load_envelope_data(
    story: str,
    tr_len: float,
    y_data: np.ndarray,
    use_cache: bool = True,
) -> np.ndarray:
    """
    Load .wavfile, compute envelope, trim and downsample to match the
    number of samples in y_data.

    Parameters
    ----------
    story: str
        The story for which to load/compute the envelope.
    tr_len: float
        The time-to-repeat (TR)
    y_data: np.ndarray
        The data to match the sampling frequency of. Uses y_data.shape[0]
        to determine the number downsampling rate.
    use_cache: bool (default = True)
        Whether or not to save computed results to cache dir
        (will save to encoders.utils.load_config()['CACHE_DIR'])

    Returns
    -------
    np.ndarray
        The envelope data.

    """

    path_cache_x = Path(CACHE_DIR, "envelope_data", f"{story}_{tr_len}_X.npy")
    if Path.exists(path_cache_x) and use_cache:
        log.info(f"Loading from cache: {path_cache_x}")
        return np.load(path_cache_x)
    elif use_cache:
        log.info(f"No data found in cache: {path_cache_x}")

    n_trs = y_data.shape[0]

    sfreq, wav_data = load_wav(story)

    # if .wav array has two channel, take the mean
    if len(wav_data.shape) == 2:
        log.info("Wav has 2 channels, averaging across chanel dimension.")
        wav_data = np.mean(wav_data, axis=1)

    X_envelope = get_envelope(wav_data)
    X_trimmed = trim(X_envelope, sfreq)
    X_data = downsample(X_trimmed, sfreq, tr_len, n_trs)
    X_data = X_data[:, np.newaxis]

    if use_cache:
        os.makedirs(Path(CACHE_DIR, "envelope_data"), exist_ok=True)
        np.save(path_cache_x, X_data)
        log.info("Cached results.")
    return X_data

make_delayed(signal, delays, circpad=False)

Create delayed versions of the 2-D signal.

Parameters:

Name Type Description Default

signal

ndarray

2-D array of shape (n_samples, n_features)

required

delays

ndarray

1-D array of delays to apply to the signal can be positive or negative; negative values advance the signal (shifting it backward)

required

circpad

bool

If True, use circular padding for delays If False, use zero padding for delays

False

Returns:

Type Description
ndarray

2-D array of shape (n_samples, n_features * ndelays)

Source code in src/encoders/features.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def make_delayed(signal: np.ndarray, delays: np.ndarray, circpad=False) -> np.ndarray:
    """
    Create delayed versions of the 2-D signal.

    Parameters
    -----------
    signal : np.ndarray
        2-D array of shape (n_samples, n_features)
    delays : np.ndarray
        1-D array of delays to apply to the signal
        can be positive or negative; negative values advance the signal (shifting it
        backward)
    circpad : bool
        If True, use circular padding for delays
        If False, use zero padding for delays

    Returns
    --------
    np.ndarray
        2-D array of shape (n_samples, n_features * ndelays)
    """

    delayed_signals = []
    n_samples, n_features = signal.shape

    for delay in delays:
        delayed_signal = np.zeros_like(signal)
        if circpad:
            delayed_signal = np.roll(signal, delay, axis=0)
        else:
            if delay > 0:
                delayed_signal[delay:, :] = signal[:-delay, :]
            elif delay < 0:
                delayed_signal[:delay, :] = signal[-delay:, :]
            else:
                delayed_signal = signal.copy()
        delayed_signals.append(delayed_signal)

    return np.hstack(delayed_signals)

sinc(f_c, t)

Sin function with cutoff frequency f_c.

Parameters:

Name Type Description Default

f_c

float

Cutoff frequency

required

t

ndarray or float

Time

required

Returns:

Type Description
ndarray or float

Sin function with cutoff frequency f_c

Source code in src/encoders/features.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def sinc(f_c, t):
    """
    Sin function with cutoff frequency f_c.

    Parameters
    -----------
    f_c : float
        Cutoff frequency
    t : np.ndarray or float
        Time

    Returns
    --------
    np.ndarray or float
        Sin function with cutoff frequency f_c
    """
    return np.sin(np.pi * f_c * t) / (np.pi * f_c * t)