Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset post processing #1505

Closed
albertz opened this issue Jan 31, 2024 · 24 comments · Fixed by #1596
Closed

Dataset post processing #1505

albertz opened this issue Jan 31, 2024 · 24 comments · Fixed by #1596
Assignees

Comments

@albertz
Copy link
Member

albertz commented Jan 31, 2024

Examples of post-processing:

  • Raw audio is stored in the HDFDataset, do feature extraction on-the-fly (but not in the network, but instead as part of the dataset).
  • Ogg is stored in the HDFDataset, do decoding on-the-fly into raw audio.
  • Apply speed perturbation on-the-fly.
  • Text (UTF8 bytes) is stored in the HDFDataset, apply BPE (or any Vocabulary) on-the-fly.
  • Concatenate sequences on-the-fly (ConcatSeqsDataset with extended functionality #1573). I.e. the post-processing is not just on a single sequence.
  • Apply mixup, i.e. mix in data from previous sequences. But this buffer would only be kept local, and reset after each epoch.

Some datasets already have partial support for post processing. Examples:

  • OggZipDataset targets, can be any type of Vocabulary (e.g. BytePairEncoding, but also SamplingBytePairEncoding, or SentencePieces). Similarly ExternSprintDataset orth_vocab.
  • ExtractAudioFeatures is used in a couple of places, e.g. by OggZipDataset audio. It supports also pre_process (on raw audio) and post_process (on audio features).

There was the idea about storing generic raw audio (or maybe even Ogg) inside the HDFDataset. And similarly, there was also the idea about storing the text (UTF8 bytes) inside the HDFDataset. In both cases, you would then maybe want to transform those into audio features or BPE labels on-the-fly as part of the dataset.

There are multiple options how to implement this:

  • Similarly as in OggZipDataset, extend some other dataset (e.g. HDFDataset) by such functionality. But how to do it in a somewhat generic and flexible way? One aspect to keep in mind is that this might also change the dimension or shape of the data. E.g. raw audio to audio features will add one dimension.
  • Make another dedicated dataset just for such transformation, similarly as MetaDataset. Or maybe make it part of MetaDataset?
  • Make this a separate mechanism in RETURNN, which we can apply after the dataset logic? Which e.g. operates on TensorDict (before batching), or on individual data streams. But the distinction when something should be done as part of the dataset and when it would be done as such post-processing would be kind of arbitrary.
@JackTemaki
Copy link
Collaborator

I would be very interested in this functionality, but I did not put too much thought in yet on how this would look best. I am definitely a fan to use MetaDataset for everything, so I would not bother if it would be part of that.

@curufinwe
Copy link
Collaborator

Commenting on your 3 suggestions in order:

  1. Not very scalable, having to add this to every dataset makes it unnecessarily complicated.
  2. The dataset classes contain more logic than just returning a given sequence. They also contain logic for sequence ordering etc.. This is not needed for a postprocessing pipeline. One could make a dataset for post-processing which takes an arbitrary function as input, but then you still have to nest your datasets correctly and the dataset dict will have a lot of nesting. That's not nice, but maybe of secondary concern.
  3. I like the idea of having a separate postprocessing function that is applied to the dataset output independent of the dataset most. It's orthogonal to the data-loading logic and does not make the dataset dict more complicated than it is right now. Only disadvantage I can see is that it adds a new global key to the config that users need to be aware of to use. They don't find out about this by just browsing through the list of available datasets or the documentation of a particular dataset.

@albertz
Copy link
Member Author

albertz commented Jan 31, 2024

  1. Not very scalable, having to add this to every dataset makes it unnecessarily complicated.

Well, you would factor this out, to have the logic in some common class PostProcessing, and it would be very easy to integrate this into an existing dataset. Similarly like Vocabulary or ExtractAudioFeatures is currently also easy to integrate. And as a starting point, maybe only HDFDataset, OggZipDataset and/or MetaDataset would have this.

But yes, I kind of agree.

  1. The dataset classes contain more logic than just returning a given sequence. They also contain logic for sequence ordering etc.

Well, we do use combinations/transformations of datasets already, e.g. see MetaDataset, CombinedDataset, ConcatSeqsDataset, etc. Only one of the datasets performs the sequence ordering then, and the other datasets will be given a predefined sequence order list. The API was specifically extended to allow that (init_seq_order(self, epoch=None, seq_list=None, seq_order=None)) and all datasets we use support this well.

So, such postprocessing logic would not really add anything new there - it fits very natural in how MetaDataset and co work currently.

  1. separate postprocessing function ... Only disadvantage I can see is that it adds a new global key

There is also another aspect which becomes ambiguous: The extern_data. Does this describe the data before or after the post processing? I.e. does this describe the data which comes out of the dataset, or the data which goes into the network/model? I think there are multiple places in our code which assumes this to be the same. E.g. we have some checks that the dataset output is compatible to extern_data. And then, we create such extern_data with the actual batched data to feed it to the model (in TF via feeding the extern_data placeholders, in PT just directly).

Note, we also have the model_outputs, which describes what comes out of the model. So maybe, if there is a custom post-process function, it requires that model_inputs is also defined, and extern_data describes the dataset output? I don't know... Or if the user does not specify model_inputs, it would assume the same format as extern_data (for many post-processing functions, e.g. speed perturbation etc, it would not change the format of the data).

@curufinwe
Copy link
Collaborator

Regarding 1:
OK, I guess we agree not to do it this way.
Regarding 2:
My main worry is that readability will suffer if we have to many nested datasets, i.e. a PostProcessingDataset wrapping a MetaDataset wrapping multiple HDFDatasets.
Regarding 3:
extern_data should describe the data after preprocessing I think. It's the only type/shape that the model sees. Regarding the checks: I guess they need to be reworked, but there shouldn't be many places were that happens, right?

@albertz
Copy link
Member Author

albertz commented Feb 1, 2024

@JackTemaki argued, he (and many others) anyway use MetaDataset, so if this is part of MetaDataset, not much would change in terms of config complexity for them.

Regarding extern_data, yes, probably you are right, and mostly extern_data describes the input to the model, and there are not many places where it checks that the dataset output matches it.

So, how would that variant look like? Just a function in the config, like:

def dataset_post_process(data: TensorDict) -> TensorDict: ...

?
But we have the train dataset, dev/eval datasets, and potentially also other datasets (forwarding, search, some custom user script, ...). Should the same post processing be used in all cases? Or should this be a property per dataset?

@curufinwe
Copy link
Collaborator

We definetly need to distinguish train and dev datasets. If we do data augmentation for training we don't necessarily want to do it for cross-validation. If we are doing some sort of format conversion then it would be needed for both. So in the end this should be a user choice. We can also have the type of dataset (train/dev/...) as one argument to the post-processing function.

@albertz
Copy link
Member Author

albertz commented Feb 1, 2024

How would the user specify such post-processing function per dataset?

It could be another argument for the dataset itself, so the user specifies it like:

train = {
   ...,
   "post_process": my_train_dataset_post_proc,
}
dev = {
   ...,
   "post_process": my_dev_dataset_post_proc,
}

It's a bit ugly, because Dataset itself then would not use this attribute at all, but only outside code would use it. But on the other side, if the user would specify it somehow outside of the dataset, as a separate config option, this could easily lead to errors, as you don't necessarily have a well defined name for the dataset. E.g. like:

dataset_post_process_funcs = {
    "train": my_train_dataset_post_proc,
    "dev": my_dev_dataset_post_proc,
}

This is maybe fine for the training task, but for search or forward, it's ambiguous, and also it doesn't really work if RETURNN is used for scripting and not as a standalone tool.

@curufinwe
Copy link
Collaborator

Could we add this to (every) engine class? The engine knows what kind of task it performs and what dataloader it uses for that task and could pick the correct post-processing function for that task.

@albertz
Copy link
Member Author

albertz commented Feb 2, 2024

The post-processing function is not per task but per dataset. At least that is what I wrote above.

Or do you want to have it per task? But I guess you don't really want it per task, but rather whether you train or eval? Or maybe a generic dataset_post_process_func, but there is always an additional extra argument train, which specifies the train flag (similar as dropout etc work, depending on this flag)? So like this:

def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

@curufinwe
Copy link
Collaborator

Sorry, I was not precise enough. What I meant was that in the engine class you know for what the dataset is used and from which name in the config it comes from (I hope). Then one can select the correct postprocessing function to go with it (by picking it from the dict you showed above with the correct key).
But that might indeed be too complicated and having the train flag could be already enough.

@albertz
Copy link
Member Author

albertz commented Feb 2, 2024

in the engine class you know for what the dataset is used and from which name in the config it comes from (I hope)

No, you don't. E.g. we have this API for forward:

def forward_with_callback(self, *, dataset: Dataset, callback: ForwardCallbackIface): ...

Or this API for the init (including training):

    def init_train_from_config(
        self,
        config: Optional[Config] = None,
        train_data: Optional[Dataset] = None,
        dev_data: Optional[Dataset] = None,
        eval_data: Optional[Dataset] = None,
    ): ...

It is handled by our __main__ to init the datasets here (train_data etc).
(Although, it's a bit inconsistent. The engine anyway has a reference to the config, and in init_train_from_config, it also checks eval_datasets directly from the config, to potentially init further eval datasets.)

But yes, so I guess we can simply use this API:

def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

@curufinwe
Copy link
Collaborator

OK, let's use
def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

@albertz
Copy link
Member Author

albertz commented Feb 6, 2024

One aspect I realized now: Where exactly would this be executed? As this is now outside the dataset, MultiProcDataset cannot really make use of this, so it cannot be parallelized over the workers of MultiProcDataset. That's a bit suboptimal, at least for my setup. But it can still run somewhere inside the PyTorch pipeline, i.e. then be parallelized over the PyTorch DataLoader worker instances, although most people don't parallelize there (num_workers 0 or 1).

@vieting
Copy link
Contributor

vieting commented Feb 21, 2024

I would also be interested in this feature. The discussed post processing solution seems fine to me. However, I would definitely like to have the post processing parallelizable into multiple procs. At least now, I have a setup with an OggZipDataset using pre_process which is wrapped by a MultiProcDataset. The MultiProcDataset gave a clear improvement in terms of computing time and therefore general speedup of training, so this would be very helpful. Maybe we could additionally allow to specify dataset_post_process_opts which could contain similar args as the MultiProcDataset takes and that automatically apply a similar multi processing logic.

@albertz
Copy link
Member Author

albertz commented Apr 11, 2024

Another aspect came up (@Judyxujj): We were interested in implementing mixup in this post processing function. But this is not really possible with the current design. This additionally needs:

  • Some way to store state (the buffer of past sequences)
  • Extra code to synchronize states (in distributed training settings, but also when multi processing is done)
  • Store the state on disk? Or just reset it at the beginning of each epoch?

(Note, I have a mixup implementation, but I did it inside the model, via a nn.Module (or rf.Module). The state is handled in the same way as e.g. nn.BatchNorm.)

@NeoLegends NeoLegends self-assigned this Jul 17, 2024
@NeoLegends
Copy link
Collaborator

NeoLegends commented Jul 22, 2024

If we limited this feature to PyTorch we could also offer the user to inject custom DataPipes into the already existing pipeline instead of providing a callback-style API. That would interact favourably with multiprocessing, offer places to store state for e.g. Mixup, and just as well allows syncing data across workers, maybe given some setup or communication primitive to bootstrap further communication through.

@vieting Are you using torch or TF in your current setups?

@albertz
Copy link
Member Author

albertz commented Jul 22, 2024

That would interact favourably with multiprocessing,

No, not really. Only the DataLoader multiproc would apply here, which is usually just a single proc. But we want to have multiple procs here.

offer places to store state for Mixup,

I don't understand. Where? How? I don't think that a data pipe should really have state (except of temporary state which we would reset every subepoch).

Having state also means that you properly store/restore the state on disk after a restart, like the model parameters or optimizer state.

and just as well allows syncing data across workers,

No, how? Every worker on the dataset is independent from each other. They don't have any way to communicate with each other.

maybe given some setup or communication primitive.

I don't see any simple way to add this on this level.

In any case, we should probably not overthink/overengineer this. E.g., for things like mixup, I think it's ok if the state is reset at the beginning of an epoch, and also if it's just local to the current worker. Otherwise mixup can just be done on the model level, which we have already implemented. And most other things you would want to do in such post processing don't have state.

Also, I tend to think, it's ok to have multiple solutions here, and to see what is easiest for the user.

  • E.g., I think one of the original requests for this feature came from @vieting, who had a HDF with raw audio, and he wanted to apply speed perturbation, in the same way we do in the OggZipDataset. The original idea was just to add it in the same way as in OggZipDataset. It would be simple to implement, and also support multiprocessing via the MultiProcDataset, just the same way we do it for OggZipDataset.

  • Having this in the config:

    def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

    That's also not too hard, but adds a bit more complexity:

    • It needs to be added to every engine (TF, PT, whatever else we would have in the future).
    • Maybe other code which operates on a dataset should also add this.
    • For multiprocessing support, we need to add some further logic here, sth like dataset_post_process_opts.
  • Allow the user to customize the PyTorch data pipeline (see PyTorch Engine._create_data_loader).

  • Doing it just inside the model code. This is always possible, and for some cases already implemented like that, e.g. for SpecAugment and Mixup.

  • Having maybe a separated PostProcessingDataset. But so far not much interest in that. But simple to implement.

@NeoLegends
Copy link
Collaborator

No, not really. Only the DataLoader multiproc would apply here, which is usually just a single proc. But we want to have multiple procs here.

I was under the assumption that in RETURNN+PT the data loader num_workers is basically a replacement for MultiProcDataset. I.e. in the cases where I want to use more than one core for data loading I'd set num_workers > 1 to shard the load across cores. Is this wrong?

@albertz
Copy link
Member Author

albertz commented Jul 24, 2024

I was under the assumption that in RETURNN+PT the data loader num_workers is basically a replacement for MultiProcDataset. I.e. in the cases where I want to use more than one core for data loading I'd set num_workers > 1 to shard the load across cores. Is this wrong?

Yes, this is likely wrong. We never really tested this, but: There is no sharding implemented for DataLoader multiple workers. It cannot be: There is no way you can do sharding in general for any dataset (or only in an inefficient way by iterating through all data and discarding what you don't want). The only dataset which can do sharding properly is DistributeFilesDataset. But I'm pretty sure it would also not properly work with DataLoader multiple workers, as we never implemented anything special for that. I think (but also not sure) every worker should get a different random seed, and thus get different data, similar as in multi GPU training. Or maybe even not that, and it's just not properly handled.

But even with DistributeFilesDataset, I'm not sure if that works well together. DistributeFilesDataset assumes that it does the sharding, and that it is not itself already sharded. It definitely needs some extra logic for that. I'm also not really sure what happens when you reach the end in one worker. It immediately stops, or it finishes until all workers have no data anymore?

On the other hand, MultiProcDataset properly handles that. It first gets a list of all sequences of the current (sub)epoch, and then distributes that list among all the workers.

@albertz
Copy link
Member Author

albertz commented Jul 24, 2024

Btw, after some discussion yesterday with @curufinwe, I think a pragmatic simple solution for now is really to implement this as a new separate dataset, like this PostProcessingDataset. This directly solves the issues with parallelization of the post processing (it lives together with the dataset, so covered by those multiple workers) and it doesn't really require any extra logic in RETURNN, so it is the simplest solution with the least amount of complexity. The only downside is that this makes the config maybe harder to read, but I'm not sure if this is really such a big issue.

@NeoLegends
Copy link
Collaborator

With those points I agree this is the simplest way to move forward, thanks for the explanations!

@albertz
Copy link
Member Author

albertz commented Jul 24, 2024

For some other examples of similar processing datasets, see: VariableDataset, MetaDataset, AnythingDataset, ConcatSeqsDataset.

Btw, in the main post, I extended a bit the list of example post processing functions. One important type is also to support concatenating sequences (see #1573). I.e. it means the post processing transformation is not necessarily only on a single individual sequence, but could also do things like concatenating sequences, maybe shuffle sequences, drop sequences, insert new sequences, etc. However, this should be implemented in a streaming way, i.e. it gets in a sequence of TensorDict, and should output a new sequence of TensorDict.

@albertz albertz changed the title HDFDataset (or generic dataset) post processing Dataset post processing Jul 24, 2024
@albertz
Copy link
Member Author

albertz commented Jul 24, 2024

However, this should be implemented in a streaming way, i.e. it gets in a sequence of TensorDict, and should output a new sequence of TensorDict.

The question is a bit, how to define the API for the user then.

Before, we suggested:

def dataset_post_process(data: TensorDict, *, train: bool = False) -> TensorDict: ...

(This would now be an argument for our PostProcessingDataset.)
(The train flag is then also not necessary anymore.)

Maybe we can still also provide this simpler API, as in many cases, the user wants to transform a single TensorDict only. So maybe the option post_process_seq: Callable[[TensorDict], TensorDict] or so.

But then, it should also support the operations over multiple TensorDicts. This could be such an option/API:

post_process_stream: Callable[[Iterator[TensorDict]], Iterator[TensorDict]]

The user can simply implement this as a generator, like so:

def no_op_post_process_stream(input_stream: Iterator[TensorDict]) -> Generator[TensorDict]:
    for data in input_stream:
        yield data 

albertz added a commit that referenced this issue Jul 30, 2024
Closes #1505

Co-authored-by: Albert Zeyer <albzey@gmail.com>
@albertz
Copy link
Member Author

albertz commented Jul 30, 2024

The PostprocessingDataset from #1596 is merged now. It should allow to do all of the examples discussed from here.

Except that it should not have state, but that's not really so much a problem. You can still implement sth like mixup. You only should reset any internal state at the beginning of an epoch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants