- Published on
Time-Series Training Partitions
- Authors
- Name
- Will Leeney
- @willleeney
I'd like to preface this with saying that this is probably not new. I'm not sure if it's new or not, but I'm going to pretend it is. I've had a bit of a google in this space and found some resources (time-series-splitting-techniques & cross-validation-in-time-series). Obviously there's a whole lot more out there but I'm not going to pretend otherwise so if you are one of the clever folks out there who has already done the method described here, please let me know. Or don't. Seethe silently if that's your preference.
What exactly was your problem, why couldn't you use the existing methods?
The partitions should reflect the real-world use case deployment scenario (I have written about this in a lot of detail in my thesis [1]). My scenario was that I had multiple time-series, each with a different length. The testing partitions needed to reflect that there would be new time-series collected and the existing time-series would be updated with a variable amount of new data.
The best existing way of carrying out cross-validation on time-series data is to use the split function shown in Figure 1. This works well when the time-series are of equal length but doesn't work well when the time-series are of different lengths. If you have different lengths then the evaluation will be biased towards the longer time-series. To address this, we can use a modified version of the split function that takes into account the length of each time-series.
What is the fix that you propose?
Well first, let me say what the restraints are i.e. the paritioning method needs to maintain the temporal order of the time-series, which means that the testing data needs to come after the training data in each split (see caveat 1 below for a discussion on this point). The testing partitions need to be independent for cross-validation - so each test parition needs to have different bits of the data in. We want to generalise to entirely new time-series so a proportion of the time-series need be to be held out for testing.
From here, the whole idea is rather simple. Set a minimum chunk of testing data that you want to be able to test on min_test_chunk_size
, then starting from the end of each time-series, split the series into these chunks. Then, for each of the n_splits
that you want, iteratively assign a chunk to each split until you get roughly the percentage that you need in each split. You can see an illustration of this chunking time-series cross-validation in Figure 2 (it argured to be called Throckmorten and but I didn't really feel like it was capable of pulling that off). This method is cool because you can also implement a min_training_len
which allows you to not have to worry about the training length being too short.
Code
Now you could implement this yourself... or ask for help from a friendly ai. But if you are feeling really lazy, as programmers often are, then you can use this function which I have written. This assumes that the data is in a 3D numpy array and that you have another numpy array of length n_instances
which specifies the length of each time-series. This should pretty much work, although I have read many blogs where the code doesn't actually work so I'm making no promises.
def time_series_split(
data: np.ndarray,
ts_end: np.ndarray,
n_folds: int = 5,
train_pct: float = 0.6,
val_pct: float = 0.1,
min_test_length: int = 4,
min_train_length: int = 0,
test_only_pct: float = 0.2,
) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
Creates k-fold splits for variable-length time series with fixed-length test segments.
Parameters:
-----------
data : np.ndarray [n_instances, n_timestamps, n_features]
Input time series data
ts_end : np.ndarray [n_instances]
Last valid timestamp index for each time series
n_folds : int, default=5
Number of cross-validation folds
train_pct : float, default=0.6
Target percentage for training data (training / (training + validation))
val_pct : float, default=0.2
Target percentage for validation data (validation / (training + validation))
min_test_length : int, default=8
Minimum length of test segments
test_only_pct : float, default=0.2
Percentage of instances reserved entirely for testing
Returns:
--------
List of n_folds tuples containing (train_end_idx, val_end_idx, test_end_idx)
"""
n_instances = data.shape[0]
# 1. Designate test-only instances
n_test_only = math.ceil(n_instances ** (1-train_pct-val_pct) * test_only_pct)
n_test_only = min(n_test_only, n_instances // 2) # Limit to half the instances
test_only_indices = np.random.choice(n_instances, size=n_test_only, replace=False)
remaining_indices = np.array([i for i in range(n_instances) if i not in test_only_indices])
# add remaining test onlys that don't fit evenly into folds into remaining_indices
if len(test_only_indices) > n_folds:
test_only_per_fold = max(1, n_test_only // n_folds)
remaining_test_only = n_test_only % n_folds
if remaining_test_only > 0:
remaining_indices = np.concat(remaining_indices, test_only_indices[-remaining_test_only:])
else:
test_only_per_fold = 1
# 2. For remaining instances, create valid length testing segments
instance_segments = {}
for i in remaining_indices:
length = ts_end[i]
if length < min_test_length * 2: # Skip if less than 2x min_test_length
continue
# Calculate valid test segment end positions
# Start from the end of the sequence and work backward in steps of min_test_length
valid_test_ends = []
# Ensure the end of test segments is after the training/validation split
# test_start_point = int(length * (train_pct + val_pct))
for test_end in range(length, min_train_length, -min_test_length):
valid_test_ends.insert(0, test_end)
# Remove the full length as we want to always have some training data
if len(valid_test_ends) > 1 and valid_test_ends[0] == length:
valid_test_ends = valid_test_ends[1:] # here
if valid_test_ends:
instance_segments[i] = valid_test_ends
# Count total available segments
total_segments = sum([len(x)for x in instance_segments.values()])
# make a segement choice mask
max_choice_segment = max([len(x) for x in instance_segments.values()])
segment_opt = np.zeros((n_instances, max_choice_segment), dtype=bool)
for instance_idx, value_list in instance_segments.items():
segment_opt[instance_idx, :len(value_list)] = True
# Calculate segments per fold
segments_per_fold = int((total_segments // n_folds) * (1 - train_pct - val_pct) * (1 - test_only_pct))
# Ensure segments_per_fold is not less than the number of segments / n_folds
max_segments_per_fold = math.floor(np.sum(segment_opt) / n_folds)
if max_segments_per_fold < segments_per_fold:
print(f"Not enough segments for fold, adjusting to max possible ({segments_per_fold} -> {max_segments_per_fold})")
segments_per_fold = max_segments_per_fold
if segments_per_fold == 0:
print('Warning: Not enough segments for fold')
print(f"total_segments: {total_segments}, n_folds: {n_folds}, segments_per_fold: {segments_per_fold}")
segments_per_fold = 1 # Ensure at least one segment per fold
# Initialize results
all_folds = []
# 3. Assign segments to folds
for fold in range(n_folds):
# Initialize end indices for all instances
train_end = np.zeros(n_instances, dtype=np.int32)
val_end = np.zeros(n_instances, dtype=np.int32)
test_end = np.zeros(n_instances, dtype=np.int32)
# condition here if less than n_fold test onlys
if fold < len(test_only_indices):
# Handle test-only instances (rotate across folds)
test_only_fold = test_only_indices[fold * test_only_per_fold:(fold + 1) * test_only_per_fold]
for idx in test_only_fold:
train_end[idx] = 0
val_end[idx] = 0
test_end[idx] = ts_end[idx]
fold_segments = []
for make_seg_choice in range(segments_per_fold):
# get the columns where segment_opt is True
segment_choices = np.argwhere(segment_opt[:, 0])[:, 0]
# chose random end index from segment_choices
seg_idx_choice = np.random.choice(segment_choices)
# get the last valid segment for this instance given segment_choices
last_valid_segment = np.where(segment_opt[seg_idx_choice])[0][-1]
# mask out the segment choice
segment_opt[seg_idx_choice, last_valid_segment] = False
# get the last valid segment for this instance given segment_choices
segment_end = instance_segments[seg_idx_choice][last_valid_segment]
fold_segments.append((seg_idx_choice, segment_end))
# get unique instances from segments
fold_instances = set([x[0] for x in fold_segments])
# assign segments to end_indices
for instance in fold_instances:
segments_for_instance = [x[1] for x in fold_segments if x[0] == instance]
test_end[instance] = max(segments_for_instance)
# assign start of test as end of val
val_end[instance] = min(segments_for_instance) - min_test_length
train_end[instance] = int(val_end[instance] - (val_pct * ts_end[instance]))
# check if train is less than min_test_length, if so, assign all to train
if val_end[instance] < min_train_length:
val_end[instance] = min_train_length
if train_end[instance] < min_train_length:
train_end[instance] = val_end[instance]
# assert no negative length
if train_end[instance] < 0 or val_end[instance] < 0:
train_end[instance] = 0
val_end[instance] = 0
# assign any remaining instances (where test_end is 0) to training
only_train = np.where(test_end == 0)[0]
for instance in only_train:
if not instance in test_only_indices:
train_end[instance] = ts_end[instance]
val_end[instance] = ts_end[instance]
test_end[instance] = ts_end[instance]
all_folds.append((train_end, val_end, test_end))
return all_folds
That's it. That's the blog. I'd love to do some experiments but I won't.
Caveats...
- I'm not sure that maintaining the temporal order of a time-series in the training data is always the best approach. For instance, in NLP tasks it is common practice to mask words in the middle of a sentence and get the model to predict the next word. Here the temporal order of the time-series (represented by a sentence) is not maintained and is the disorder is leveraged to provide a training paradigm for a model. In this situtation, it seems perfectly reasonable to not maintain the temporal order of the split and it is in fact useful to play around with it. Therefore, there are situations in which it is not necessary to maintain temporality for generalisation testing. What is the answer? - as with all things, it really depends on what you are trying to acheive... I'm trying to illustrate a different way of paritioning time-series that is more suitable for time-series data of different lengths, so I'm not really concerned with this point (for now... I am actually really interested in exploring this more but for the sake of the reader's time and my own sanity I need to move on).
References: