diff --git a/cassandra/src/main/mima-filters/1.1.x.backwards.excludes b/cassandra/src/main/mima-filters/1.1.x.backwards.excludes new file mode 100644 index 0000000000..8e170333a4 --- /dev/null +++ b/cassandra/src/main/mima-filters/1.1.x.backwards.excludes @@ -0,0 +1,3 @@ +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.cassandra.javadsl.CassandraFlow.createUnloggedBatchWithPassThrough") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.cassandra.javadsl.CassandraFlow.createWithPassThrough") diff --git a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraFlow.scala b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraFlow.scala index b8fb864e3a..17aaf68cce 100644 --- a/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraFlow.scala +++ b/cassandra/src/main/scala/akka/stream/alpakka/cassandra/javadsl/CassandraFlow.scala @@ -26,16 +26,6 @@ object CassandraFlow { .createWithPassThrough[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(session) .asJava - @deprecated("use createWithPassThrough without ExecutionContext instead", "0.20") - def createWithPassThrough[T](parallelism: Int, - statement: PreparedStatement, - statementBinder: BiFunction[T, PreparedStatement, BoundStatement], - session: Session, - ignored: ExecutionContext): Flow[T, T, NotUsed] = - ScalaCFlow - .createWithPassThrough[T](parallelism, statement, (t, p) => statementBinder.apply(t, p))(session) - .asJava - /** * Creates a flow that batches using an unlogged batch. Use this when most of the elements in the stream * share the same partition key. Cassandra unlogged batches that share the same partition key will only @@ -57,26 +47,4 @@ object CassandraFlow { settings)(session) .asJava - /** - * Creates a flow that batches using an unlogged batch. Use this when most of the elements in the stream - * share the same partition key. Cassandra unlogged batches that share the same partition key will only - * resolve to one write internally in Cassandra, boosting write performance. - * - * Be aware that this stage does not preserve the upstream order. - */ - @deprecated("use createUnloggedBatchWithPassThrough without ExecutionContext instead", "0.20") - def createUnloggedBatchWithPassThrough[T, K](parallelism: Int, - statement: PreparedStatement, - statementBinder: BiFunction[T, PreparedStatement, BoundStatement], - partitionKey: Function[T, K], - settings: CassandraBatchSettings, - session: Session, - ignored: ExecutionContext): Flow[T, T, NotUsed] = - ScalaCFlow - .createUnloggedBatchWithPassThrough[T, K](parallelism, - statement, - (t, p) => statementBinder.apply(t, p), - t => partitionKey.apply(t), - settings)(session) - .asJava } diff --git a/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java b/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java index 498f517e3e..4ab478d67b 100644 --- a/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java +++ b/file/src/main/java/akka/stream/alpakka/file/javadsl/DirectoryChangesSource.java @@ -9,7 +9,6 @@ import akka.stream.alpakka.file.DirectoryChange; import akka.stream.javadsl.Source; import akka.util.JavaDurationConverters; -import scala.concurrent.duration.FiniteDuration; import java.nio.file.Path; @@ -21,22 +20,6 @@ */ public final class DirectoryChangesSource { - /** - * @param directoryPath Directory to watch - * @param pollInterval Interval between polls to the JDK watch service when a push comes in and - * there was no changes, if the JDK implementation is slow, it will not help lowering this - * @param maxBufferSize Maximum number of buffered directory changes before the stage fails - * @deprecated Use the method taking `java.time.Duration` instead - */ - @Deprecated - @SuppressWarnings("unchecked") - public static Source, NotUsed> create( - Path directoryPath, FiniteDuration pollInterval, int maxBufferSize) { - return Source.fromGraph( - new akka.stream.alpakka.file.impl.DirectoryChangesSource( - directoryPath, pollInterval, maxBufferSize, Pair::apply)); - } - /** * @param directoryPath Directory to watch * @param pollInterval Interval between polls to the JDK watch service when a push comes in and diff --git a/file/src/main/java/akka/stream/alpakka/file/javadsl/FileTailSource.java b/file/src/main/java/akka/stream/alpakka/file/javadsl/FileTailSource.java index 3c8b0681e1..3d77a6275a 100644 --- a/file/src/main/java/akka/stream/alpakka/file/javadsl/FileTailSource.java +++ b/file/src/main/java/akka/stream/alpakka/file/javadsl/FileTailSource.java @@ -27,28 +27,6 @@ */ public final class FileTailSource { - /** - * Read the entire contents of a file as chunks of bytes and when the end is reached, keep reading - * newly appended data. Like the unix command `tail -f` but for bytes. - * - *

Reading text lines can be done with the `createLines` factory methods or by composing with - * other stages manually depending on your needs. Aborting the stage can be done by combining with - * a [[akka.stream.KillSwitch]] - * - * @param path a file path to tail - * @param maxChunkSize The max emitted size of the `ByteString`s - * @param startingPosition Offset into the file to start reading - * @param pollingInterval When the end has been reached, look for new content with this interval - * @deprecated use method taking `java.time.Duration` instead - */ - @Deprecated - public static Source create( - Path path, int maxChunkSize, long startingPosition, FiniteDuration pollingInterval) { - return Source.fromGraph( - new akka.stream.alpakka.file.impl.FileTailSource( - path, maxChunkSize, startingPosition, pollingInterval)); - } - /** * Read the entire contents of a file as chunks of bytes and when the end is reached, keep reading * newly appended data. Like the unix command `tail -f` but for bytes. @@ -72,29 +50,6 @@ public static Source create( JavaDurationConverters.asFiniteDuration(pollingInterval))); } - /** - * Read the entire contents of a file as text lines, and then when the end is reached, keep - * reading newly appended data. Like the unix command `tail -f`. - * - *

If a line is longer than `maxChunkSize` the stream will fail. - * - *

Aborting the stage can be done by combining with a [[akka.stream.KillSwitch]] - * - * @param path a file path to tail - * @param maxLineSize The max emitted size of the `ByteString`s - * @param pollingInterval When the end has been reached, look for new content with this interval - * @param lf The character or characters used as line separator - * @param charset The charset of the file - * @deprecated use method taking `java.time.Duration` instead - */ - @Deprecated - public static Source createLines( - Path path, int maxLineSize, FiniteDuration pollingInterval, String lf, Charset charset) { - return create(path, maxLineSize, 0, pollingInterval) - .via(Framing.delimiter(ByteString.fromString(lf, charset.name()), maxLineSize)) - .map(bytes -> bytes.decodeString(charset)); - } - /** * Read the entire contents of a file as text lines, and then when the end is reached, keep * reading newly appended data. Like the unix command `tail -f`. @@ -119,13 +74,16 @@ public static Source createLines( /** * Same as {@link #createLines(Path, int, java.time.Duration, String, Charset)} but using the OS * default line separator and UTF-8 for charset + * + * @deprecated (since 2.0.0) use method with `java.time.Duration` instead */ + @Deprecated public static Source createLines( Path path, int maxChunkSize, FiniteDuration pollingInterval) { return createLines( path, maxChunkSize, - pollingInterval, + java.time.Duration.ofNanos(pollingInterval.toNanos()), System.getProperty("line.separator"), StandardCharsets.UTF_8); } diff --git a/file/src/main/mima-filters/1.1.x.backwards.excludes b/file/src/main/mima-filters/1.1.x.backwards.excludes new file mode 100644 index 0000000000..3249f308ff --- /dev/null +++ b/file/src/main/mima-filters/1.1.x.backwards.excludes @@ -0,0 +1,4 @@ +# Remove deprecated methods +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.file.javadsl.FileTailSource.createLines") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.file.javadsl.FileTailSource.create") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.file.javadsl.DirectoryChangesSource.create") diff --git a/file/src/test/java/docs/javadsl/FileTailSourceTest.java b/file/src/test/java/docs/javadsl/FileTailSourceTest.java index 25829ee39e..b2178a180e 100644 --- a/file/src/test/java/docs/javadsl/FileTailSourceTest.java +++ b/file/src/test/java/docs/javadsl/FileTailSourceTest.java @@ -15,21 +15,20 @@ import akka.stream.javadsl.Source; import akka.stream.testkit.TestSubscriber; import akka.stream.testkit.javadsl.StreamTestKit; -import akka.testkit.TestKit; +import akka.testkit.javadsl.TestKit; import akka.util.ByteString; import com.google.common.jimfs.Configuration; import com.google.common.jimfs.Jimfs; import org.junit.After; import org.junit.Before; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystem; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.util.concurrent.TimeUnit; +import java.time.Duration; import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.file.StandardOpenOption.APPEND; @@ -60,7 +59,7 @@ public void canReadAnEntireFile() throws Exception { path, 8192, // chunk size 0, // starting position - FiniteDuration.create(250, TimeUnit.MILLISECONDS)); + Duration.ofMillis((250))); final TestSubscriber.Probe subscriber = TestSubscriber.probe(system); @@ -86,7 +85,7 @@ public void willReadNewLinesAppendedAfterReadingTheInitialContents() throws Exce akka.stream.alpakka.file.javadsl.FileTailSource.createLines( path, 8192, // chunk size - FiniteDuration.create(250, TimeUnit.MILLISECONDS), + Duration.ofMillis(250), "\n", StandardCharsets.UTF_8); @@ -118,7 +117,7 @@ public void tearDown() throws Exception { fs.close(); fs = null; StreamTestKit.assertAllStagesStopped(materializer); - TestKit.shutdownActorSystem(system, FiniteDuration.create(10, TimeUnit.SECONDS), true); + TestKit.shutdownActorSystem(system); system = null; materializer = null; } @@ -133,7 +132,7 @@ public static void main(String... args) { // #simple-lines final FileSystem fs = FileSystems.getDefault(); - final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS); + final Duration pollingInterval = Duration.ofMillis(250); final int maxLineSize = 8192; final Source lines = diff --git a/jms/src/main/mima-filters/1.1.x.backwards.excludes b/jms/src/main/mima-filters/1.1.x.backwards.excludes index 9d91b9e8bb..3b2502fa2d 100644 --- a/jms/src/main/mima-filters/1.1.x.backwards.excludes +++ b/jms/src/main/mima-filters/1.1.x.backwards.excludes @@ -4,3 +4,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.Jm ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.JmsMessage.withProperties") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.JmsMessage.withHeader") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.alpakka.jms.JmsMessage.withoutDestination") + +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.jms.JmsConsumerSettings.withCredential") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.jms.JmsProducerSettings.withCredential") diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsConsumerSettings.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsConsumerSettings.scala index 1ef5d07e4e..8eab44520c 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsConsumerSettings.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsConsumerSettings.scala @@ -43,8 +43,6 @@ final class JmsConsumerSettings private ( /** Set a JMS to subscribe to. Allows for custom handling with [[akka.stream.alpakka.jms.CustomDestination CustomDestination]]. */ def withDestination(value: Destination): JmsConsumerSettings = copy(destination = Option(value)) - @deprecated("use withCredentials instead", "1.0-M1") - def withCredential(value: Credentials): JmsConsumerSettings = copy(credentials = Option(value)) /** Set JMS broker credentials. */ def withCredentials(value: Credentials): JmsConsumerSettings = copy(credentials = Option(value)) diff --git a/jms/src/main/scala/akka/stream/alpakka/jms/JmsProducerSettings.scala b/jms/src/main/scala/akka/stream/alpakka/jms/JmsProducerSettings.scala index 7ef5107e36..3e5b704c29 100644 --- a/jms/src/main/scala/akka/stream/alpakka/jms/JmsProducerSettings.scala +++ b/jms/src/main/scala/akka/stream/alpakka/jms/JmsProducerSettings.scala @@ -39,8 +39,6 @@ final class JmsProducerSettings private ( /** Set a JMS destination. Allows for custom handling with [[akka.stream.alpakka.jms.CustomDestination CustomDestination]]. */ def withDestination(value: Destination): JmsProducerSettings = copy(destination = Option(value)) - @deprecated("use withCredentials instead", "1.0-M1") - def withCredential(value: Credentials): JmsProducerSettings = copy(credentials = Option(value)) /** Set JMS broker credentials. */ def withCredentials(value: Credentials): JmsProducerSettings = copy(credentials = Option(value)) diff --git a/jms/src/test/java/docs/javadsl/JmsSettingsTest.java b/jms/src/test/java/docs/javadsl/JmsSettingsTest.java index 686675b1c7..58bfd51687 100644 --- a/jms/src/test/java/docs/javadsl/JmsSettingsTest.java +++ b/jms/src/test/java/docs/javadsl/JmsSettingsTest.java @@ -60,7 +60,7 @@ public void producerSettings() throws Exception { JmsProducerSettings settings = JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory("broker-url")) .withTopic("target-topic") - .withCredential(Credentials.create("username", "password")) + .withCredentials(Credentials.create("username", "password")) .withConnectionRetrySettings(retrySettings) .withSendRetrySettings(sendRetrySettings) .withSessionCount(10) @@ -79,7 +79,7 @@ public void consumerSettings() throws Exception { JmsConsumerSettings settings = JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url")) .withTopic("message-topic") - .withCredential(Credentials.create("username", "password")) + .withCredentials(Credentials.create("username", "password")) .withConnectionRetrySettings(retrySettings) .withSessionCount(10) .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge()) diff --git a/kinesis/src/main/mima-filters/1.1.x.backwards.excludes b/kinesis/src/main/mima-filters/1.1.x.backwards.excludes new file mode 100644 index 0000000000..3637f5b1c0 --- /dev/null +++ b/kinesis/src/main/mima-filters/1.1.x.backwards.excludes @@ -0,0 +1,2 @@ +# Remove deprecated methods +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.kinesis.ShardSettings.withAtTimestamp") diff --git a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardSettings.scala b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardSettings.scala index 7e931f8a6e..b749ebf4de 100644 --- a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardSettings.scala +++ b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/ShardSettings.scala @@ -49,15 +49,6 @@ final class ShardSettings private ( def withStartingAfterSequenceNumber(value: String): ShardSettings = copy(shardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER, startingSequenceNumber = Option(value)) - /** - * Sets `shardIteratorType` to `AT_TIMESTAMP` and uses the given `Instant` as starting timestamp. - * - * @deprecated prefer java.time.Instant to provide the timeout, since 1.0-M3 - */ - @deprecated("prefer java.time.Instant to provide the timeout", "1.0-M3") - def withAtTimestamp(value: java.util.Date): ShardSettings = - copy(shardIteratorType = ShardIteratorType.AT_TIMESTAMP, atTimestamp = Option(value.toInstant)) - /** * Sets `shardIteratorType` to `AT_TIMESTAMP` and uses the given `Instant` as starting timestamp. */ diff --git a/mqtt/src/main/mima-filters/1.1.x.backwards.excludes b/mqtt/src/main/mima-filters/1.1.x.backwards.excludes new file mode 100644 index 0000000000..4c05314719 --- /dev/null +++ b/mqtt/src/main/mima-filters/1.1.x.backwards.excludes @@ -0,0 +1,20 @@ +# Remove deprecated methods +ProblemFilters.exclude[Problem]("akka.stream.alpakka.mqtt.MqttSourceSettings") +ProblemFilters.exclude[Problem]("akka.stream.alpakka.mqtt.MqttSourceSettings$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttFlow.atLeastOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttFlow.atMostOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttFlow.create") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttSource.atLeastOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttSource.atMostOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttFlow.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttFlow.atLeastOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttFlow.atMostOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttSource.atLeastOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttSource.atMostOnce") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withConnectionTimeout") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withDisconnectTimeout") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withKeepAliveInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.MqttConnectionSettings.withDisconnectQuiesceTimeout") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttMessageWithAck.messageArrivedComplete") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.javadsl.MqttMessageWithAckImpl.messageArrivedComplete") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.mqtt.scaladsl.MqttMessageWithAck.messageArrivedComplete") diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttFlow.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttFlow.scala index 02f910a9c1..f76b0a41da 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttFlow.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttFlow.scala @@ -19,41 +19,6 @@ import scala.compat.java8.FutureConverters._ */ object MqttFlow { - /** - * Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle). - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @param defaultQos Quality of service level applied for messages not specifying a message specific value - * @deprecated use atMostOnce() instead - */ - @deprecated("use atMostOnce instead", "1.0-M1") - @java.lang.Deprecated - def create(sourceSettings: MqttSourceSettings, - bufferSize: Int, - defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, CompletionStage[Done]] = - atMostOnce(sourceSettings.connectionSettings, - MqttSubscriptions(sourceSettings.subscriptions), - bufferSize, - defaultQos) - - /** - * Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle). - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @param defaultQos Quality of service level applied for messages not specifying a message specific value - * @deprecated use atMostOnce() instead - */ - @deprecated("use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - @java.lang.Deprecated - def atMostOnce(settings: MqttSourceSettings, - bufferSize: Int, - defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, CompletionStage[Done]] = - atMostOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize, defaultQos) - /** * Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle). * @@ -71,24 +36,6 @@ object MqttFlow { .mapMaterializedValue(_.toJava) .asJava - /** - * Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception. - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @param defaultQos Quality of service level applied for messages not specifying a message specific value - * @deprecated use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead - */ - @deprecated("use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - @java.lang.Deprecated - def atLeastOnce( - settings: MqttSourceSettings, - bufferSize: Int, - defaultQos: MqttQoS - ): Flow[MqttMessage, MqttMessageWithAck, CompletionStage[Done]] = - atLeastOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize, defaultQos) - /** * Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception. * diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttMessageWithAck.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttMessageWithAck.scala index 47fcc4e8d3..48561a3b6f 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttMessageWithAck.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttMessageWithAck.scala @@ -25,13 +25,6 @@ sealed trait MqttMessageWithAck { */ val message: MqttMessage - /** - * @deprecated use commit instead, since 0.21 - */ - @deprecated("use ack() instead", "1.0-M1") - @java.lang.Deprecated - def messageArrivedComplete(): CompletionStage[Done] = ack() - /** * Signals `messageArrivedComplete` to MQTT. * diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala index 8faa348fa9..a3ba5eb3ce 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/javadsl/MqttSource.scala @@ -19,19 +19,6 @@ import scala.compat.java8.FutureConverters._ */ object MqttSource { - /** - * Create a source subscribing to MQTT messages (without a commit handle). - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @deprecated use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead - */ - @deprecated("use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - @java.lang.Deprecated - def atMostOnce(settings: MqttSourceSettings, bufferSize: Int): Source[MqttMessage, CompletionStage[Done]] = - atMostOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize) - /** * Create a source subscribing to MQTT messages (without a commit handle). * @@ -47,19 +34,6 @@ object MqttSource { .mapMaterializedValue(_.toJava) .asJava - /** - * Create a source subscribing to MQTT messages with a commit handle to acknowledge message reception. - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @deprecated use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead - */ - @deprecated("use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - @java.lang.Deprecated - def atLeastOnce(settings: MqttSourceSettings, bufferSize: Int): Source[MqttMessageWithAck, CompletionStage[Done]] = - atLeastOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize) - /** * Create a source subscribing to MQTT messages with a commit handle to acknowledge message reception. * diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttFlow.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttFlow.scala index 2ec489b6d2..c71f59e4d6 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttFlow.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttFlow.scala @@ -18,41 +18,6 @@ import scala.concurrent.Future */ object MqttFlow { - /** - * Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle). - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @param defaultQos Quality of service level applied for messages not specifying a message specific value - */ - @deprecated("use atMostOnce instead", "1.0-M1") - def apply(sourceSettings: MqttSourceSettings, - bufferSize: Int, - defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, Future[Done]] = - atMostOnce(sourceSettings.connectionSettings, - MqttSubscriptions(sourceSettings.subscriptions), - bufferSize, - defaultQos) - - /** - * Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle). - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @param defaultQos Quality of service level applied for messages not specifying a message specific value - */ - @deprecated("use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - def atMostOnce(sourceSettings: MqttSourceSettings, - bufferSize: Int, - defaultQos: MqttQoS): Flow[MqttMessage, MqttMessage, Future[Done]] = - Flow - .fromGraph( - new MqttFlowStage(sourceSettings.connectionSettings, sourceSettings.subscriptions, bufferSize, defaultQos) - ) - .map(_.message) - /** * Create a flow to send messages to MQTT AND subscribe to MQTT messages (without a commit handle). * @@ -71,26 +36,6 @@ object MqttFlow { ) .map(_.message) - /** - * Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception. - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - * @param defaultQos Quality of service level applied for messages not specifying a message specific value - */ - @deprecated("use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - def atLeastOnce(sourceSettings: MqttSourceSettings, - bufferSize: Int, - defaultQos: MqttQoS): Flow[MqttMessage, MqttMessageWithAck, Future[Done]] = - Flow.fromGraph( - new MqttFlowStage(sourceSettings.connectionSettings, - sourceSettings.subscriptions, - bufferSize, - defaultQos, - manualAcks = true) - ) - /** * Create a flow to send messages to MQTT AND subscribe to MQTT messages with a commit handle to acknowledge message reception. * diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttMessageWithAck.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttMessageWithAck.scala index 4dbc241d28..c19024da75 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttMessageWithAck.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttMessageWithAck.scala @@ -23,9 +23,6 @@ trait MqttMessageWithAck { */ val message: MqttMessage - @deprecated("use ack() instead", "1.0-M1") - def messageArrivedComplete(): Future[Done] = ack() - /** * Signals `messageArrivedComplete` to MQTT. * diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala index aadbf53552..a32490a2d1 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/scaladsl/MqttSource.scala @@ -17,17 +17,6 @@ import scala.concurrent.Future */ object MqttSource { - /** - * Create a source subscribing to MQTT messages (without a commit handle). - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - */ - @deprecated("use atMostOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - def atMostOnce(settings: MqttSourceSettings, bufferSize: Int): Source[MqttMessage, Future[Done]] = - atMostOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize) - /** * Create a source subscribing to MQTT messages (without a commit handle). * @@ -43,17 +32,6 @@ object MqttSource { MqttFlow.atMostOnce(settings, subscriptions, bufferSize, defaultQos = MqttQoS.AtLeastOnce) )(Keep.right) - /** - * Create a source subscribing to MQTT messages with a commit handle to acknowledge message reception. - * - * The materialized value completes on successful connection to the MQTT broker. - * - * @param bufferSize max number of messages read from MQTT before back-pressure applies - */ - @deprecated("use atLeastOnce with MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - def atLeastOnce(settings: MqttSourceSettings, bufferSize: Int): Source[MqttMessageWithAck, Future[Done]] = - atLeastOnce(settings.connectionSettings, MqttSubscriptions(settings.subscriptions), bufferSize) - /** * Create a source subscribing to MQTT messages with a commit handle to acknowledge message reception. * diff --git a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala index 32d1a0ccb9..fd173af130 100644 --- a/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala +++ b/mqtt/src/main/scala/akka/stream/alpakka/mqtt/settings.scala @@ -133,39 +133,6 @@ object MqttSubscriptions { } -/** - * @param subscriptions the mapping between a topic name and a [[MqttQoS]]. - * @deprecated use MqttConnectionSettings and MqttSubscriptions instead - */ -@deprecated("use MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") -@java.lang.Deprecated -final case class MqttSourceSettings( - connectionSettings: MqttConnectionSettings, - subscriptions: Map[String, MqttQoS] = Map.empty -) { - @annotation.varargs - def withSubscriptions(subscriptions: akka.japi.Pair[String, MqttQoS]*) = - copy(subscriptions = subscriptions.map(_.toScala).toMap) -} - -/** - * @deprecated use MqttConnectionSettings and MqttSubscriptions instead - */ -@deprecated("use MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") -@java.lang.Deprecated -object MqttSourceSettings { - - /** - * Java API: create [[MqttSourceSettings]]. - * - * @deprecated use MqttConnectionSettings and MqttSubscriptions instead - */ - @deprecated("use MqttConnectionSettings and MqttSubscriptions instead", "1.0-M1") - @java.lang.Deprecated - def create(connectionSettings: MqttConnectionSettings) = - MqttSourceSettings(connectionSettings) -} - private[mqtt] final case class MqttOfflinePersistenceSettings( bufferSize: Int = 5000, deleteOldestMessage: Boolean = false, @@ -278,34 +245,6 @@ final class MqttConnectionSettings private (val broker: String, Option(MqttOfflinePersistenceSettings(bufferSize, deleteOldestMessage, persistBuffer)) ) - /** - * @deprecated use with [[java.time.Duration]] instead - */ - @java.lang.Deprecated - def withKeepAliveInterval(keepAliveInterval: Int, unit: TimeUnit): MqttConnectionSettings = - copy(keepAliveInterval = FiniteDuration(keepAliveInterval, unit)) - - /** - * @deprecated use with [[java.time.Duration]] instead - */ - @java.lang.Deprecated - def withConnectionTimeout(connectionTimeout: Int, unit: TimeUnit): MqttConnectionSettings = - copy(connectionTimeout = FiniteDuration(connectionTimeout, unit)) - - /** - * @deprecated use with [[java.time.Duration]] instead - */ - @java.lang.Deprecated - def withDisconnectQuiesceTimeout(disconnectQuiesceTimeout: Int, unit: TimeUnit): MqttConnectionSettings = - copy(disconnectQuiesceTimeout = FiniteDuration(disconnectQuiesceTimeout, unit)) - - /** - * @deprecated use with [[java.time.Duration]] instead - */ - @java.lang.Deprecated - def withDisconnectTimeout(disconnectTimeout: Int, unit: TimeUnit): MqttConnectionSettings = - copy(disconnectTimeout = FiniteDuration(disconnectTimeout, unit)) - private def copy( broker: String = broker, clientId: String = clientId, diff --git a/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java b/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java index 777981b6ee..c7cb92a390 100644 --- a/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java +++ b/mqtt/src/test/java/docs/javadsl/MqttFlowTest.java @@ -132,11 +132,6 @@ class MqttMessageWithAckFake extends MqttMessageWithAckImpl { acked = false; } - @Override - public CompletionStage messageArrivedComplete() { - return ack(); - } - @Override public CompletionStage ack() { acked = true; diff --git a/sqs/src/main/mima-filters/1.1.x.backwards.excludes b/sqs/src/main/mima-filters/1.1.x.backwards.excludes new file mode 100644 index 0000000000..aa9686e0f0 --- /dev/null +++ b/sqs/src/main/mima-filters/1.1.x.backwards.excludes @@ -0,0 +1,3 @@ +# Remove deprecated methods +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.sqs.SqsAckGroupedSettings.withMaxBatchWait") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.alpakka.sqs.SqsPublishGroupedSettings.withMaxBatchWait") diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckGroupedSettings.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckGroupedSettings.scala index 72fdbe78c6..b372b44bd6 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckGroupedSettings.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsAckGroupedSettings.scala @@ -4,7 +4,6 @@ package akka.stream.alpakka.sqs -import scala.concurrent.duration.{FiniteDuration, TimeUnit} import scala.concurrent.duration._ import akka.util.JavaDurationConverters._ @@ -30,15 +29,6 @@ final class SqsAckGroupedSettings private (val maxBatchSize: Int, scala.concurrent.duration.FiniteDuration(value.toMillis, java.util.concurrent.TimeUnit.MILLISECONDS) ) - /** - * Java API - * - * @deprecated use withMaxBatchWait(java.time.Duration) instead - */ - @deprecated("use withMaxBatchWait(java.time.Duration) instead", "1.0-M1") - def withMaxBatchWait(length: Long, unit: TimeUnit): SqsAckGroupedSettings = - this.copy(maxBatchWait = FiniteDuration(length, unit)) - def withConcurrentRequests(value: Int): SqsAckGroupedSettings = copy(concurrentRequests = value) private def copy(maxBatchSize: Int = maxBatchSize, diff --git a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsPublishGroupedSettings.scala b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsPublishGroupedSettings.scala index 5a6bb8c80e..9e14e0ef76 100644 --- a/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsPublishGroupedSettings.scala +++ b/sqs/src/main/scala/akka/stream/alpakka/sqs/SqsPublishGroupedSettings.scala @@ -4,9 +4,6 @@ package akka.stream.alpakka.sqs -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ final class SqsPublishGroupedSettings private (val maxBatchSize: Int, @@ -30,15 +27,6 @@ final class SqsPublishGroupedSettings private (val maxBatchSize: Int, scala.concurrent.duration.FiniteDuration(value.toMillis, java.util.concurrent.TimeUnit.MILLISECONDS) ) - /** - * Java API - * - * @deprecated use withMaxBatchWait(java.time.Duration) instead - */ - @deprecated("use withMaxBatchWait(java.time.Duration) instead", "1.0-M1") - def withMaxBatchWait(length: Long, unit: TimeUnit): SqsPublishGroupedSettings = - copy(maxBatchWait = FiniteDuration(length, unit)) - def withConcurrentRequests(value: Int): SqsPublishGroupedSettings = copy(concurrentRequests = value) private def copy(maxBatchSize: Int = maxBatchSize, @@ -49,7 +37,11 @@ final class SqsPublishGroupedSettings private (val maxBatchSize: Int, concurrentRequests = concurrentRequests) override def toString = - s"""SqsPublishGroupedSettings(maxBatchSize=$maxBatchSize,maxBatchWait=$maxBatchWait,concurrentRequests=$concurrentRequests)""" + "SqsPublishGroupedSettings(" + + s"maxBatchSize=$maxBatchSize," + + s"maxBatchWait=$maxBatchWait," + + s"concurrentRequests=$concurrentRequests" + + ")" }