Skip to content

Commit

Permalink
Remove methods deprecated before 1.0 (#1877)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Aug 12, 2019
1 parent 83835bf commit fa260ab
Show file tree
Hide file tree
Showing 24 changed files with 53 additions and 372 deletions.
3 changes: 3 additions & 0 deletions cassandra/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Remove deprecated methods
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.cassandra.javadsl.CassandraFlow.createUnloggedBatchWithPassThrough")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.cassandra.javadsl.CassandraFlow.createWithPassThrough")
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@ object CassandraFlow {
.createWithPassThrough[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(session)
.asJava

@deprecated("use createWithPassThrough without ExecutionContext instead", "0.20")
def createWithPassThrough[T](parallelism: Int,
statement: PreparedStatement,
statementBinder: BiFunction[T, PreparedStatement, BoundStatement],
session: Session,
ignored: ExecutionContext): Flow[T, T, NotUsed] =
ScalaCFlow
.createWithPassThrough[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(session)
.asJava

/**
* Creates a flow that batches using an unlogged batch. Use this when most of the elements in the stream
* share the same partition key. Cassandra unlogged batches that share the same partition key will only
Expand All @@ -57,26 +47,4 @@ object CassandraFlow {
settings)(session)
.asJava

/**
* Creates a flow that batches using an unlogged batch. Use this when most of the elements in the stream
* share the same partition key. Cassandra unlogged batches that share the same partition key will only
* resolve to one write internally in Cassandra, boosting write performance.
*
* Be aware that this stage does not preserve the upstream order.
*/
@deprecated("use createUnloggedBatchWithPassThrough without ExecutionContext instead", "0.20")
def createUnloggedBatchWithPassThrough[T, K](parallelism: Int,
statement: PreparedStatement,
statementBinder: BiFunction[T, PreparedStatement, BoundStatement],
partitionKey: Function[T, K],
settings: CassandraBatchSettings,
session: Session,
ignored: ExecutionContext): Flow[T, T, NotUsed] =
ScalaCFlow
.createUnloggedBatchWithPassThrough[T, K](parallelism,
statement,
(t, p) => statementBinder.apply(t, p),
t => partitionKey.apply(t),
settings)(session)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.javadsl.Source;
import akka.util.JavaDurationConverters;
import scala.concurrent.duration.FiniteDuration;

import java.nio.file.Path;

Expand All @@ -21,22 +20,6 @@
*/
public final class DirectoryChangesSource {

/**
* @param directoryPath Directory to watch
* @param pollInterval Interval between polls to the JDK watch service when a push comes in and
* there was no changes, if the JDK implementation is slow, it will not help lowering this
* @param maxBufferSize Maximum number of buffered directory changes before the stage fails
* @deprecated Use the method taking `java.time.Duration` instead
*/
@Deprecated
@SuppressWarnings("unchecked")
public static Source<Pair<Path, DirectoryChange>, NotUsed> create(
Path directoryPath, FiniteDuration pollInterval, int maxBufferSize) {
return Source.fromGraph(
new akka.stream.alpakka.file.impl.DirectoryChangesSource(
directoryPath, pollInterval, maxBufferSize, Pair::apply));
}

/**
* @param directoryPath Directory to watch
* @param pollInterval Interval between polls to the JDK watch service when a push comes in and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,6 @@
*/
public final class FileTailSource {

/**
* Read the entire contents of a file as chunks of bytes and when the end is reached, keep reading
* newly appended data. Like the unix command `tail -f` but for bytes.
*
* <p>Reading text lines can be done with the `createLines` factory methods or by composing with
* other stages manually depending on your needs. Aborting the stage can be done by combining with
* a [[akka.stream.KillSwitch]]
*
* @param path a file path to tail
* @param maxChunkSize The max emitted size of the `ByteString`s
* @param startingPosition Offset into the file to start reading
* @param pollingInterval When the end has been reached, look for new content with this interval
* @deprecated use method taking `java.time.Duration` instead
*/
@Deprecated
public static Source<ByteString, NotUsed> create(
Path path, int maxChunkSize, long startingPosition, FiniteDuration pollingInterval) {
return Source.fromGraph(
new akka.stream.alpakka.file.impl.FileTailSource(
path, maxChunkSize, startingPosition, pollingInterval));
}

/**
* Read the entire contents of a file as chunks of bytes and when the end is reached, keep reading
* newly appended data. Like the unix command `tail -f` but for bytes.
Expand All @@ -72,29 +50,6 @@ public static Source<ByteString, NotUsed> create(
JavaDurationConverters.asFiniteDuration(pollingInterval)));
}

/**
* Read the entire contents of a file as text lines, and then when the end is reached, keep
* reading newly appended data. Like the unix command `tail -f`.
*
* <p>If a line is longer than `maxChunkSize` the stream will fail.
*
* <p>Aborting the stage can be done by combining with a [[akka.stream.KillSwitch]]
*
* @param path a file path to tail
* @param maxLineSize The max emitted size of the `ByteString`s
* @param pollingInterval When the end has been reached, look for new content with this interval
* @param lf The character or characters used as line separator
* @param charset The charset of the file
* @deprecated use method taking `java.time.Duration` instead
*/
@Deprecated
public static Source<String, NotUsed> createLines(
Path path, int maxLineSize, FiniteDuration pollingInterval, String lf, Charset charset) {
return create(path, maxLineSize, 0, pollingInterval)
.via(Framing.delimiter(ByteString.fromString(lf, charset.name()), maxLineSize))
.map(bytes -> bytes.decodeString(charset));
}

/**
* Read the entire contents of a file as text lines, and then when the end is reached, keep
* reading newly appended data. Like the unix command `tail -f`.
Expand All @@ -119,13 +74,16 @@ public static Source<String, NotUsed> createLines(
/**
* Same as {@link #createLines(Path, int, java.time.Duration, String, Charset)} but using the OS
* default line separator and UTF-8 for charset
*
* @deprecated (since 2.0.0) use method with `java.time.Duration` instead
*/
@Deprecated
public static Source<String, NotUsed> createLines(
Path path, int maxChunkSize, FiniteDuration pollingInterval) {
return createLines(
path,
maxChunkSize,
pollingInterval,
java.time.Duration.ofNanos(pollingInterval.toNanos()),
System.getProperty("line.separator"),
StandardCharsets.UTF_8);
}
Expand Down
4 changes: 4 additions & 0 deletions file/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Remove deprecated methods
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.file.javadsl.FileTailSource.createLines")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.file.javadsl.FileTailSource.create")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.file.javadsl.DirectoryChangesSource.create")
13 changes: 6 additions & 7 deletions file/src/test/java/docs/javadsl/FileTailSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
import akka.stream.javadsl.Source;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.StreamTestKit;
import akka.testkit.TestKit;
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;

import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.time.Duration;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.StandardOpenOption.APPEND;
Expand Down Expand Up @@ -60,7 +59,7 @@ public void canReadAnEntireFile() throws Exception {
path,
8192, // chunk size
0, // starting position
FiniteDuration.create(250, TimeUnit.MILLISECONDS));
Duration.ofMillis((250)));

final TestSubscriber.Probe<ByteString> subscriber = TestSubscriber.probe(system);

Expand All @@ -86,7 +85,7 @@ public void willReadNewLinesAppendedAfterReadingTheInitialContents() throws Exce
akka.stream.alpakka.file.javadsl.FileTailSource.createLines(
path,
8192, // chunk size
FiniteDuration.create(250, TimeUnit.MILLISECONDS),
Duration.ofMillis(250),
"\n",
StandardCharsets.UTF_8);

Expand Down Expand Up @@ -118,7 +117,7 @@ public void tearDown() throws Exception {
fs.close();
fs = null;
StreamTestKit.assertAllStagesStopped(materializer);
TestKit.shutdownActorSystem(system, FiniteDuration.create(10, TimeUnit.SECONDS), true);
TestKit.shutdownActorSystem(system);
system = null;
materializer = null;
}
Expand All @@ -133,7 +132,7 @@ public static void main(String... args) {

// #simple-lines
final FileSystem fs = FileSystems.getDefault();
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final Duration pollingInterval = Duration.ofMillis(250);
final int maxLineSize = 8192;

final Source<String, NotUsed> lines =
Expand Down
4 changes: 4 additions & 0 deletions jms/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.Jm
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.JmsMessage.withProperties")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.JmsMessage.withHeader")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.JmsMessage.withoutDestination")

# Remove deprecated methods
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.jms.JmsConsumerSettings.withCredential")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.jms.JmsProducerSettings.withCredential")
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ final class JmsConsumerSettings private (

/** Set a JMS to subscribe to. Allows for custom handling with [[akka.stream.alpakka.jms.CustomDestination CustomDestination]]. */
def withDestination(value: Destination): JmsConsumerSettings = copy(destination = Option(value))
@deprecated("use withCredentials instead", "1.0-M1")
def withCredential(value: Credentials): JmsConsumerSettings = copy(credentials = Option(value))

/** Set JMS broker credentials. */
def withCredentials(value: Credentials): JmsConsumerSettings = copy(credentials = Option(value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ final class JmsProducerSettings private (

/** Set a JMS destination. Allows for custom handling with [[akka.stream.alpakka.jms.CustomDestination CustomDestination]]. */
def withDestination(value: Destination): JmsProducerSettings = copy(destination = Option(value))
@deprecated("use withCredentials instead", "1.0-M1")
def withCredential(value: Credentials): JmsProducerSettings = copy(credentials = Option(value))

/** Set JMS broker credentials. */
def withCredentials(value: Credentials): JmsProducerSettings = copy(credentials = Option(value))
Expand Down
4 changes: 2 additions & 2 deletions jms/src/test/java/docs/javadsl/JmsSettingsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void producerSettings() throws Exception {
JmsProducerSettings settings =
JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory("broker-url"))
.withTopic("target-topic")
.withCredential(Credentials.create("username", "password"))
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
.withSendRetrySettings(sendRetrySettings)
.withSessionCount(10)
Expand All @@ -79,7 +79,7 @@ public void consumerSettings() throws Exception {
JmsConsumerSettings settings =
JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
.withTopic("message-topic")
.withCredential(Credentials.create("username", "password"))
.withCredentials(Credentials.create("username", "password"))
.withConnectionRetrySettings(retrySettings)
.withSessionCount(10)
.withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge())
Expand Down
2 changes: 2 additions & 0 deletions kinesis/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Remove deprecated methods
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.kinesis.ShardSettings.withAtTimestamp")
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,6 @@ final class ShardSettings private (
def withStartingAfterSequenceNumber(value: String): ShardSettings =
copy(shardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER, startingSequenceNumber = Option(value))

/**
* Sets `shardIteratorType` to `AT_TIMESTAMP` and uses the given `Instant` as starting timestamp.
*
* @deprecated prefer java.time.Instant to provide the timeout, since 1.0-M3
*/
@deprecated("prefer java.time.Instant to provide the timeout", "1.0-M3")
def withAtTimestamp(value: java.util.Date): ShardSettings =
copy(shardIteratorType = ShardIteratorType.AT_TIMESTAMP, atTimestamp = Option(value.toInstant))

/**
* Sets `shardIteratorType` to `AT_TIMESTAMP` and uses the given `Instant` as starting timestamp.
*/
Expand Down
20 changes: 20 additions & 0 deletions mqtt/src/main/mima-filters/1.1.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Remove deprecated methods
ProblemFilters.exclude[Problem]("akka.stream.alpakka.mqtt.MqttSourceSettings")
ProblemFilters.exclude[Problem]("akka.stream.alpakka.mqtt.MqttSourceSettings$")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttFlow.atLeastOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttFlow.atMostOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttFlow.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttSource.atLeastOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttSource.atMostOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttFlow.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttFlow.atLeastOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttFlow.atMostOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttSource.atLeastOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttSource.atMostOnce")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withConnectionTimeout")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withDisconnectTimeout")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withKeepAliveInterval")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withDisconnectQuiesceTimeout")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttMessageWithAck.messageArrivedComplete")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttMessageWithAckImpl.messageArrivedComplete")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttMessageWithAck.messageArrivedComplete")
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,6 @@ import scala.compat.java8.FutureConverters._
*/
object MqttFlow {

/**
* Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle).
*
* The materialized value completes on successful connection to the MQTT broker.
*
* @param bufferSize max number of messages read from MQTT before back-pressure applies
* @param defaultQos Quality of service level applied for messages not specifying a message specific value
* @deprecated use atMostOnce() instead
*/
@deprecated("use atMostOnce instead", "1.0-M1")
@java.lang.Deprecated
def create(sourceSettings: MqttSourceSettings,
bufferSize: Int,
defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, CompletionStage[Done]] =
atMostOnce(sourceSettings.connectionSettings,
MqttSubscriptions(sourceSettings.subscriptions),
bufferSize,
defaultQos)

/**
* Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle).
*
* The materialized value completes on successful connection to the MQTT broker.
*
* @param bufferSize max number of messages read from MQTT before back-pressure applies
* @param defaultQos Quality of service level applied for messages not specifying a message specific value
* @deprecated use atMostOnce() instead
*/
@deprecated("use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1")
@java.lang.Deprecated
def atMostOnce(settings: MqttSourceSettings,
bufferSize: Int,
defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, CompletionStage[Done]] =
atMostOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize, defaultQos)

/**
* Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle).
*
Expand All @@ -71,24 +36,6 @@ object MqttFlow {
.mapMaterializedValue(_.toJava)
.asJava

/**
* Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception.
*
* The materialized value completes on successful connection to the MQTT broker.
*
* @param bufferSize max number of messages read from MQTT before back-pressure applies
* @param defaultQos Quality of service level applied for messages not specifying a message specific value
* @deprecated use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead
*/
@deprecated("use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1")
@java.lang.Deprecated
def atLeastOnce(
settings: MqttSourceSettings,
bufferSize: Int,
defaultQos: MqttQoS
): Flow[MqttMessage, MqttMessageWithAck, CompletionStage[Done]] =
atLeastOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize, defaultQos)

/**
* Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ sealed trait MqttMessageWithAck {
*/
val message: MqttMessage

/**
* @deprecated use commit instead, since 0.21
*/
@deprecated("use ack() instead", "1.0-M1")
@java.lang.Deprecated
def messageArrivedComplete(): CompletionStage[Done] = ack()

/**
* Signals `messageArrivedComplete` to MQTT.
*
Expand Down
Loading

0 comments on commit fa260ab

Please sign in to comment.