Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSS][New API][WIP] Some code suggestion #2127

Closed
13 of 15 tasks
lhyundeadsoul opened this issue Jul 4, 2022 · 12 comments
Closed
13 of 15 tasks

[DISCUSS][New API][WIP] Some code suggestion #2127

lhyundeadsoul opened this issue Jul 4, 2022 · 12 comments

Comments

@lhyundeadsoul
Copy link
Contributor

lhyundeadsoul commented Jul 4, 2022

Code of Conduct

Search before asking

  • I had searched in the issues and found no similar issues.

Describe the proposal

There are some code suggestions for implementing the new connector api . These may help make the code more understandable, unambiguous, and reasonable.
Please let me know if I didn't understand the code design exactly. Thx.

Task list

  • SingleSplitReaderContext
  • SeaTunnelRuntimeEnvironment
  • org.apache.seatunnel.api.source.SourceSplitEnumerator.Context
    • registeredReaders,assignSplit(int, java.util.List<SplitT>) and signalNoMoreSplits(int subtask) seems belong to SplitEnumerator behavior, not the Context (context can only support execution environment information I think)
    • registeredReaders's return type is Set<Integer>, how about Set<SourceReader>
  • org.apache.seatunnel.api.serialization.Serializer
    • Serializer doesn't need to be Serializable, it won't be transmitted over the network.
  • org.apache.seatunnel.api.source.SourceReader
    • org.apache.seatunnel.api.source.SourceReader.Context#sendSourceEventToCoordinator Does Coordinator means SourceSplitEnumerator here? If so, we'd better unify the concept name in all occurences.
    • org.apache.seatunnel.api.source.SourceReader#snapshotState looks similar with org.apache.seatunnel.api.source.SourceSplitEnumerator#snapshotState , but return a different type(List<SplitT> and StateT), the comments is split checkpoint state. while actually it returns List<SplitT>, they are not same in my mind. Could we have a chance to unify the snapshot behavior?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@Hisoka-X
Copy link
Member

Hisoka-X commented Jul 5, 2022

@ashulin Hi, PTAL

@lhyundeadsoul lhyundeadsoul changed the title [Umbrella][WIP] Some code suggestion [Umbrella][New API][WIP] Some code suggestion Jul 5, 2022
@lhyundeadsoul lhyundeadsoul closed this as not planned Won't fix, can't repro, duplicate, stale Jul 5, 2022
@lhyundeadsoul lhyundeadsoul reopened this Jul 5, 2022
@lhyundeadsoul
Copy link
Contributor Author

cc @ruanwenjun @CalvinKirs

@lhyundeadsoul lhyundeadsoul changed the title [Umbrella][New API][WIP] Some code suggestion [DISCUSS][New API][WIP] Some code suggestion Jul 6, 2022
@ashulin
Copy link
Member

ashulin commented Jul 6, 2022

org.apache.seatunnel.api.source.SourceSplitEnumerator.Context #assignSplit and #signalNoMoreSplits is behavior belonging to SourceSplitEnumerator. These behaviors are in the context to mask their internal implementation, and I don't have a better way to implement this.
In my opinion, SourceSplitEnumerator.Context#registeredReaders can't return instances of the readers, because it may mislead developers to use readers directly, so currently only some information about the readers is returned.

@ashulin
Copy link
Member

ashulin commented Jul 6, 2022

org.apache.seatunnel.api.serialization.Serializer
Serializer doesn't need to be Serializable, it won't be transmitted over the network.

I agree with you. In current usage scenarios, it's serialization is not required.

@ashulin
Copy link
Member

ashulin commented Jul 6, 2022

org.apache.seatunnel.api.source.SourceReader.Context#sendSourceEventToCoordinator Does Coordinator means SourceSplitEnumerator here? If so, we'd better unify the concept name in all occurences.

You have a point there. At the moment, the name is lead to ambiguity. You're welcome to contribute it.

@ashulin
Copy link
Member

ashulin commented Jul 6, 2022

org.apache.seatunnel.api.source.SourceReader#snapshotState looks similar with org.apache.seatunnel.api.source.SourceSplitEnumerator#snapshotState , but return a different type(List and StateT), the comments is split checkpoint state. while actually it returns List, they are not same in my mind. Could we have a chance to unify the snapshot behavior?

SourceSplitEnumerator#snapshotState and SourceReader#snapshotState are different.
SourceSplitEnumerator assumes the role of coordinator, which may require information beyond the snapshot split.
SourceReader is designed to only need splits to run, so the snapshot returns List<SplitT>.

@lhyundeadsoul
Copy link
Contributor Author

org.apache.seatunnel.api.source.SourceReader#snapshotState looks similar with org.apache.seatunnel.api.source.SourceSplitEnumerator#snapshotState , but return a different type(List and StateT), the comments is split checkpoint state. while actually it returns List, they are not same in my mind. Could we have a chance to unify the snapshot behavior?

SourceSplitEnumerator#snapshotState and SourceReader#snapshotState are different. SourceSplitEnumerator assumes the role of coordinator, which may require information beyond the snapshot split. SourceReader is designed to only need splits to run, so the snapshot returns List<SplitT>.

In my option, SplitT is the type of SourceSplit, not the State of checkpoint, I wonder if List<StateT> is more appropriate?

@lhyundeadsoul
Copy link
Contributor Author

In my opinion, SourceSplitEnumerator.Context#registeredReaders can't return instances of the readers, because it may mislead developers to use readers directly, so currently only some information about the readers is returned.

OK. We can do it this way for now.

@ashulin
Copy link
Member

ashulin commented Jul 6, 2022

org.apache.seatunnel.api.source.SourceReader#snapshotState looks similar with org.apache.seatunnel.api.source.SourceSplitEnumerator#snapshotState , but return a different type(List and StateT), the comments is split checkpoint state. while actually it returns List, they are not same in my mind. Could we have a chance to unify the snapshot behavior?

SourceSplitEnumerator#snapshotState and SourceReader#snapshotState are different. SourceSplitEnumerator assumes the role of coordinator, which may require information beyond the snapshot split. SourceReader is designed to only need splits to run, so the snapshot returns List<SplitT>.

In my option, SplitT is the type of SourceSplit, not the State of checkpoint, I wonder if List<StateT> is more appropriate?

The List<SplitT> returned by SourceReader#snapshotState will be added to SourceSplitEnumerator#addSplitsBack. This is to run normally when the parallelism of the source is changed, so the state of the reader needs to be able to be converted to split.

To avoid ambiguity, we can add

public interface SourceSplitState {
    SourceSplit toSplit();
}

and SourceReader#snapshotState return List<SourceSplitState>.

@lhyundeadsoul
Copy link
Contributor Author

lhyundeadsoul commented Jul 6, 2022

org.apache.seatunnel.api.source.SourceSplitEnumerator.Context #assignSplit and #signalNoMoreSplits is behavior belonging to SourceSplitEnumerator. These behaviors are in the context to mask their internal implementation, and I don't have a better way to implement this.

Now I see what you mean.
You also think #assignSplit and #signalNoMoreSplits should belong to SourceSplitEnumerator, but you don't want every subclass of SourceSplitEnumerator has to implement #assignSplit and #signalNoMoreSplits (because it has some common logic) .
Am I right?

If so, in my opinion, there is a solution that may help.

  1. move #assignSplit and #signalNoMoreSplits to SourceSplitEnumerator
  2. create 2 abstract class extending SourceSplitEnumerator (ParallelSourceSplitEnumerator and CoordinatedSourceSplitEnumerator) which mask common logic for parallel and coordinate.
  3. Other SourceSplitEnumerator (e.g. ClickhouseSourceSplitEnumerator, KafkaSourceSplitEnumerator) extend ParallelSourceSplitEnumerator or CoordinatedSourceSplitEnumerator

So does SourceReader.Context

@ashulin
Copy link
Member

ashulin commented Jul 6, 2022

@lhyundeadsoul You can see the translation module, The interaction between SourceSplitEnumerator and SourceReader requires an intermediate class (Flink 1.14 hides RPC in Context), which cannot be solved by using abstract classes.

lhyundeadsoul added a commit to lhyundeadsoul/incubator-seatunnel that referenced this issue Jul 6, 2022
1. class name of SeaTunnelRuntimeEnvironment changes to SeaTunnelContextAware
2. Serializer doesn't need to extend Serializable
3. unify Enumerator concept
lhyundeadsoul added a commit to lhyundeadsoul/incubator-seatunnel that referenced this issue Jul 6, 2022
1. class name of SeaTunnelRuntimeEnvironment changes to SeaTunnelContextAware
2. Serializer doesn't need to extend Serializable
3. unify Enumerator concept
Hisoka-X pushed a commit that referenced this issue Jul 7, 2022
1. class name of SeaTunnelRuntimeEnvironment changes to SeaTunnelContextAware
2. Serializer doesn't need to extend Serializable
3. unify Enumerator concept
@Hisoka-X
Copy link
Member

Close by #2144

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants