wLW>Lwlw
Published on

Time-Series Training Partitions

Authors

I'd like to preface this with saying that this is probably not new. I'm not sure if it's new or not, but I'm going to pretend it is. I've had a bit of a google in this space and found some resources (time-series-splitting-techniques & cross-validation-in-time-series). Obviously there's a whole lot more out there but I'm not going to pretend otherwise so if you are one of the clever folks out there who has already done the method described here, please let me know. Or don't. Seethe silently if that's your preference.

What exactly was your problem, why couldn't you use the existing methods?

The partitions should reflect the real-world use case deployment scenario (I have written about this in a lot of detail in my thesis [1]). My scenario was that I had multiple time-series, each with a different length. The testing partitions needed to reflect that there would be new time-series collected and the existing time-series would be updated with a variable amount of new data.

The best existing way of carrying out cross-validation on time-series data is to use the split function shown in Figure 1. This works well when the time-series are of equal length but doesn't work well when the time-series are of different lengths. If you have different lengths then the evaluation will be biased towards the longer time-series. To address this, we can use a modified version of the split function that takes into account the length of each time-series.

original_split
Figure 1: Traditional time-series cross-validation split showing train/test partitions.

What is the fix that you propose?

Well first, let me say what the restraints are i.e. the paritioning method needs to maintain the temporal order of the time-series, which means that the testing data needs to come after the training data in each split (see caveat 1 below for a discussion on this point). The testing partitions need to be independent for cross-validation - so each test parition needs to have different bits of the data in. We want to generalise to entirely new time-series so a proportion of the time-series need be to be held out for testing.

From here, the whole idea is rather simple. Set a minimum chunk of testing data that you want to be able to test on min_test_chunk_size, then starting from the end of each time-series, split the series into these chunks. Then, for each of the n_splits that you want, iteratively assign a chunk to each split until you get roughly the percentage that you need in each split. You can see an illustration of this chunking time-series cross-validation in Figure 2 (it argured to be called Throckmorten and but I didn't really feel like it was capable of pulling that off). This method is cool because you can also implement a min_training_len which allows you to not have to worry about the training length being too short.

upgraded_time_split
Figure 2: Chunking time-series cross-validation split showing train/test partitions.

Code

Now you could implement this yourself... or ask for help from a friendly ai. But if you are feeling really lazy, as programmers often are, then you can use this function which I have written. This assumes that the data is in a 3D numpy array and that you have another numpy array of length n_instances which specifies the length of each time-series. This should pretty much work, although I have read many blogs where the code doesn't actually work so I'm making no promises.

def time_series_split(
    data: np.ndarray,
    ts_end: np.ndarray,
    n_folds: int = 5,
    train_pct: float = 0.6,
    val_pct: float = 0.1,
    min_test_length: int = 4,
    min_train_length: int = 0,
    test_only_pct: float = 0.2,
) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]:
    """
    Creates k-fold splits for variable-length time series with fixed-length test segments.

    Parameters:
    -----------
    data : np.ndarray [n_instances, n_timestamps, n_features]
        Input time series data
    ts_end : np.ndarray [n_instances]
        Last valid timestamp index for each time series
    n_folds : int, default=5
        Number of cross-validation folds
    train_pct : float, default=0.6
        Target percentage for training data (training / (training + validation))
    val_pct : float, default=0.2
        Target percentage for validation data (validation / (training + validation))
    min_test_length : int, default=8
        Minimum length of test segments
    test_only_pct : float, default=0.2
        Percentage of instances reserved entirely for testing

    Returns:
    --------
    List of n_folds tuples containing (train_end_idx, val_end_idx, test_end_idx)
    """

    n_instances = data.shape[0]

    # 1. Designate test-only instances
    n_test_only = math.ceil(n_instances ** (1-train_pct-val_pct) * test_only_pct)
    n_test_only = min(n_test_only, n_instances // 2)  # Limit to half the instances
    test_only_indices = np.random.choice(n_instances, size=n_test_only, replace=False)
    remaining_indices = np.array([i for i in range(n_instances) if i not in test_only_indices])

    # add remaining test onlys that don't fit evenly into folds into remaining_indices
    if len(test_only_indices) > n_folds:
        test_only_per_fold = max(1, n_test_only // n_folds)
        remaining_test_only = n_test_only % n_folds
        if remaining_test_only > 0:
            remaining_indices = np.concat(remaining_indices, test_only_indices[-remaining_test_only:])
    else:
        test_only_per_fold = 1


    # 2. For remaining instances, create valid length testing segments
    instance_segments = {}

    for i in remaining_indices:
        length = ts_end[i]
        if length < min_test_length * 2:  # Skip if less than 2x min_test_length
            continue

        # Calculate valid test segment end positions
        # Start from the end of the sequence and work backward in steps of min_test_length
        valid_test_ends = []
        # Ensure the end of test segments is after the training/validation split
        # test_start_point = int(length * (train_pct + val_pct))
        for test_end in range(length, min_train_length, -min_test_length):
            valid_test_ends.insert(0, test_end)

        # Remove the full length as we want to always have some training data
        if len(valid_test_ends) > 1 and valid_test_ends[0] == length:
            valid_test_ends = valid_test_ends[1:] # here

        if valid_test_ends:
            instance_segments[i] = valid_test_ends

    # Count total available segments
    total_segments = sum([len(x)for x in instance_segments.values()])
    # make a segement choice mask
    max_choice_segment = max([len(x) for x in instance_segments.values()])
    segment_opt = np.zeros((n_instances, max_choice_segment), dtype=bool)
    for instance_idx, value_list in instance_segments.items():
        segment_opt[instance_idx, :len(value_list)] = True

    # Calculate segments per fold
    segments_per_fold = int((total_segments // n_folds) *  (1 - train_pct - val_pct) * (1 - test_only_pct))

    # Ensure segments_per_fold is not less than the number of segments / n_folds
    max_segments_per_fold = math.floor(np.sum(segment_opt) / n_folds)
    if max_segments_per_fold < segments_per_fold:
        print(f"Not enough segments for fold, adjusting to max possible ({segments_per_fold} -> {max_segments_per_fold})")
        segments_per_fold = max_segments_per_fold

    if segments_per_fold == 0:
        print('Warning: Not enough segments for fold')
        print(f"total_segments: {total_segments}, n_folds: {n_folds}, segments_per_fold: {segments_per_fold}")
        segments_per_fold = 1  # Ensure at least one segment per fold


    # Initialize results
    all_folds = []

    # 3. Assign segments to folds
    for fold in range(n_folds):
        # Initialize end indices for all instances
        train_end = np.zeros(n_instances, dtype=np.int32)
        val_end = np.zeros(n_instances, dtype=np.int32)
        test_end = np.zeros(n_instances, dtype=np.int32)

        # condition here if less than n_fold test onlys
        if fold < len(test_only_indices):
            # Handle test-only instances (rotate across folds)
            test_only_fold = test_only_indices[fold * test_only_per_fold:(fold + 1) * test_only_per_fold]
            for idx in test_only_fold:
                train_end[idx] = 0
                val_end[idx] = 0
                test_end[idx] = ts_end[idx]

        fold_segments = []
        for make_seg_choice in range(segments_per_fold):
            # get the columns where segment_opt is True
            segment_choices = np.argwhere(segment_opt[:, 0])[:, 0]
            # chose random end index from segment_choices
            seg_idx_choice = np.random.choice(segment_choices)
            # get the last valid segment for this instance given segment_choices
            last_valid_segment = np.where(segment_opt[seg_idx_choice])[0][-1]
            # mask out the segment choice
            segment_opt[seg_idx_choice, last_valid_segment] = False
            # get the last valid segment for this instance given segment_choices
            segment_end = instance_segments[seg_idx_choice][last_valid_segment]
            fold_segments.append((seg_idx_choice, segment_end))

        # get unique instances from segments
        fold_instances = set([x[0] for x in fold_segments])
        # assign segments to end_indices
        for instance in fold_instances:
            segments_for_instance = [x[1] for x in fold_segments if x[0] == instance]
            test_end[instance] = max(segments_for_instance)
            # assign start of test as end of val
            val_end[instance] = min(segments_for_instance) - min_test_length
            train_end[instance] = int(val_end[instance] - (val_pct * ts_end[instance]))
            # check if train is less than min_test_length, if so, assign all to train
            if val_end[instance] < min_train_length:
                val_end[instance] = min_train_length

            if train_end[instance] < min_train_length:
                train_end[instance] = val_end[instance]

            # assert no negative length
            if train_end[instance] < 0 or val_end[instance] < 0:
                train_end[instance] = 0
                val_end[instance] = 0

        # assign any remaining instances (where test_end is 0) to training
        only_train = np.where(test_end == 0)[0]
        for instance in only_train:
            if not instance in test_only_indices:
                train_end[instance] = ts_end[instance]
                val_end[instance] = ts_end[instance]
                test_end[instance] = ts_end[instance]

        all_folds.append((train_end, val_end, test_end))

    return all_folds

That's it. That's the blog. I'd love to do some experiments but I won't.

Caveats...

  1. I'm not sure that maintaining the temporal order of a time-series in the training data is always the best approach. For instance, in NLP tasks it is common practice to mask words in the middle of a sentence and get the model to predict the next word. Here the temporal order of the time-series (represented by a sentence) is not maintained and is the disorder is leveraged to provide a training paradigm for a model. In this situtation, it seems perfectly reasonable to not maintain the temporal order of the split and it is in fact useful to play around with it. Therefore, there are situations in which it is not necessary to maintain temporality for generalisation testing. What is the answer? - as with all things, it really depends on what you are trying to acheive... I'm trying to illustrate a different way of paritioning time-series that is more suitable for time-series data of different lengths, so I'm not really concerned with this point (for now... I am actually really interested in exploring this more but for the sake of the reader's time and my own sanity I need to move on).

References:

[1]
W. LEENEY, “Unsupervised Graph Neural Networks,” 2024.