[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

If we don't hear much from users, I would be for merging the change as long as it is marked @Experimental until we get future feedback on its usage.

On Wed, Sep 19, 2018 at 2:19 PM Jeff Klukas <jklukas@xxxxxxxxxxx> wrote:
Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I was hoping to get context on, since I don't yet have extensive experience with beam.

I have not yet run into issues where the output coder was not able to be inferred. I expect this may be a non-issue, as the individual transforms used within a user-provided lambda _expression_ would presumably expose the ability to specify a coder.

I don't have enough context yet to comment on whether display data might be an issue, so I do hope the user list can provide input there.

On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
Thanks for the proposal and it does seem to make the API cleaner to build anonymous composite transforms.

In your experience have you had issues where the API doesn't work out well because the PTransform:
* is not able to override how the output coder is inferred?
* can't supply display data?

+user@xxxxxxxxxxxxxxx, do users think that the provided API would be useful enough for it to be added to the core SDK or would the addition of the method provide noise/detract from the existing API?

On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas <jklukas@xxxxxxxxxxx> wrote:
I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this suggestion and make it more concrete:

On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas <jklukas@xxxxxxxxxxx> wrote:
Hello all, I'm a data engineer at Mozilla working on a first project using Beam. I've been impressed with the usability of the API as there are good built-in solutions for handling many simple transformation cases with minimal code, and wanted to discuss one bit of ergonomics that seems to be missing.

It appears that none of the existing PTransform factories are generic enough to take in or output a PCollectionTuple, but we've found many use cases where it's convenient to apply a few transforms on a PCollectionTuple in a lambda _expression_.

For example, we've defined several PTransforms that return main and error output stream bundled in a PCollectionTuple. We defined a CompositeTransform interface so that we could handle the error output in a lambda _expression_ like:

    .apply("attempt to deserialize messages", new MyDeserializationTransform())
    .apply("write deserialization errors",
        CompositeTransform.of((PCollectionTuple input) -> {
            input.get(errorTag).apply(new MyErrorOutputTransform())
            return input.get(mainTag);
    .apply("more processing on the deserialized messages", new MyOtherTransform())

I'd be interested in contributing a patch to add this functionality, perhaps as a static method PTransform.compose(). Would that patch be welcome? Are there other thoughts on naming?

The full code of the CompositeTransform interface we're currently using is included below.

public interface CompositeTransform<InputT extends PInput, OutputT extends POutput> {
  OutputT expand(InputT input);

   * The public factory method that serves as the entrypoint for users to create a composite PTransform.
  static <InputT extends PInput, OutputT extends POutput>
        PTransform<InputT, OutputT> of(CompositeTransform<InputT, OutputT> transform) {
    return new PTransform<InputT, OutputT>() {
      public OutputT expand(InputT input) {
        return transform.expand(input);