-
Notifications
You must be signed in to change notification settings - Fork 530
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[INLONG-10358][Sort] Make pulsar source support report audit informat…
…ion exactly once (#10511)
- Loading branch information
1 parent
ee5f93f
commit ada439c
Showing
9 changed files
with
1,364 additions
and
6 deletions.
There are no files selected for viewing
186 changes: 186 additions & 0 deletions
186
...rt-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.inlong.sort.pulsar.source; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.annotation.PublicEvolving; | ||
import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
import org.apache.flink.api.connector.source.Boundedness; | ||
import org.apache.flink.api.connector.source.Source; | ||
import org.apache.flink.api.connector.source.SourceReader; | ||
import org.apache.flink.api.connector.source.SourceReaderContext; | ||
import org.apache.flink.api.connector.source.SplitEnumerator; | ||
import org.apache.flink.api.connector.source.SplitEnumeratorContext; | ||
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; | ||
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; | ||
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; | ||
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; | ||
import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; | ||
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; | ||
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; | ||
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; | ||
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; | ||
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; | ||
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext; | ||
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; | ||
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer; | ||
import org.apache.flink.core.io.SimpleVersionedSerializer; | ||
|
||
/** | ||
* The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a | ||
* {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records | ||
* of <code>String</code> type. | ||
* | ||
* <pre>{@code | ||
* PulsarSource<String> source = PulsarSource | ||
* .builder() | ||
* .setTopics(TOPIC1, TOPIC2) | ||
* .setServiceUrl(getServiceUrl()) | ||
* .setAdminUrl(getAdminUrl()) | ||
* .setSubscriptionName("test") | ||
* .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())) | ||
* .setBounded(StopCursor::defaultStopCursor) | ||
* .build(); | ||
* }</pre> | ||
* | ||
* <p>See {@link PulsarSourceBuilder} for more details. | ||
* | ||
* @param <OUT> The output type of the source. | ||
* copy from {@link org.apache.flink.connector.pulsar.source.PulsarSource} | ||
* not modified | ||
*/ | ||
@PublicEvolving | ||
public final class PulsarSource<OUT> | ||
implements | ||
Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>, | ||
ResultTypeQueryable<OUT> { | ||
|
||
private static final long serialVersionUID = 7773108631275567433L; | ||
|
||
/** | ||
* The configuration for pulsar source, we don't support the pulsar's configuration class | ||
* directly. | ||
*/ | ||
private final SourceConfiguration sourceConfiguration; | ||
|
||
private final PulsarSubscriber subscriber; | ||
|
||
private final RangeGenerator rangeGenerator; | ||
|
||
private final StartCursor startCursor; | ||
|
||
private final StopCursor stopCursor; | ||
|
||
private final Boundedness boundedness; | ||
|
||
/** The pulsar deserialization schema used for deserializing message. */ | ||
private final PulsarDeserializationSchema<OUT> deserializationSchema; | ||
|
||
/** | ||
* The constructor for PulsarSource, it's package protected for forcing using {@link | ||
* PulsarSourceBuilder}. | ||
*/ | ||
PulsarSource( | ||
SourceConfiguration sourceConfiguration, | ||
PulsarSubscriber subscriber, | ||
RangeGenerator rangeGenerator, | ||
StartCursor startCursor, | ||
StopCursor stopCursor, | ||
Boundedness boundedness, | ||
PulsarDeserializationSchema<OUT> deserializationSchema) { | ||
this.sourceConfiguration = sourceConfiguration; | ||
this.subscriber = subscriber; | ||
this.rangeGenerator = rangeGenerator; | ||
this.startCursor = startCursor; | ||
this.stopCursor = stopCursor; | ||
this.boundedness = boundedness; | ||
this.deserializationSchema = deserializationSchema; | ||
} | ||
|
||
/** | ||
* Get a PulsarSourceBuilder to builder a {@link PulsarSource}. | ||
* | ||
* @return a Pulsar source builder. | ||
*/ | ||
public static <OUT> PulsarSourceBuilder<OUT> builder() { | ||
return new PulsarSourceBuilder<>(); | ||
} | ||
|
||
@Override | ||
public Boundedness getBoundedness() { | ||
return boundedness; | ||
} | ||
|
||
@Internal | ||
@Override | ||
public SourceReader<OUT, PulsarPartitionSplit> createReader(SourceReaderContext readerContext) | ||
throws Exception { | ||
// Initialize the deserialization schema before creating the pulsar reader. | ||
PulsarDeserializationSchemaInitializationContext initializationContext = | ||
new PulsarDeserializationSchemaInitializationContext(readerContext); | ||
deserializationSchema.open(initializationContext, sourceConfiguration); | ||
|
||
return PulsarSourceReaderFactory.create( | ||
readerContext, deserializationSchema, sourceConfiguration); | ||
} | ||
|
||
@Internal | ||
@Override | ||
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator( | ||
SplitEnumeratorContext<PulsarPartitionSplit> enumContext) { | ||
return new PulsarSourceEnumerator( | ||
subscriber, | ||
startCursor, | ||
stopCursor, | ||
rangeGenerator, | ||
sourceConfiguration, | ||
enumContext); | ||
} | ||
|
||
@Internal | ||
@Override | ||
public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator( | ||
SplitEnumeratorContext<PulsarPartitionSplit> enumContext, | ||
PulsarSourceEnumState checkpoint) { | ||
return new PulsarSourceEnumerator( | ||
subscriber, | ||
startCursor, | ||
stopCursor, | ||
rangeGenerator, | ||
sourceConfiguration, | ||
enumContext, | ||
checkpoint); | ||
} | ||
|
||
@Internal | ||
@Override | ||
public SimpleVersionedSerializer<PulsarPartitionSplit> getSplitSerializer() { | ||
return PulsarPartitionSplitSerializer.INSTANCE; | ||
} | ||
|
||
@Internal | ||
@Override | ||
public SimpleVersionedSerializer<PulsarSourceEnumState> getEnumeratorCheckpointSerializer() { | ||
return PulsarSourceEnumStateSerializer.INSTANCE; | ||
} | ||
|
||
@Override | ||
public TypeInformation<OUT> getProducedType() { | ||
return deserializationSchema.getProducedType(); | ||
} | ||
} |
Oops, something went wrong.