Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres COPY fails past minChunkSize when using non-Pure effects #1512

Closed
dleblanc opened this issue Aug 3, 2021 · 0 comments
Closed

Postgres COPY fails past minChunkSize when using non-Pure effects #1512

dleblanc opened this issue Aug 3, 2021 · 0 comments
Assignees

Comments

@dleblanc
Copy link

dleblanc commented Aug 3, 2021

We're seeing that the copy operation gets closed once we have a stream that's larger than the minChunkSize for the .copyIn() method. We only see this when using effects other than Pure.

This is true for both 1.0.0-M1 and 1.0.0-M5.

Failing test code follows:

package io.citrine.tbl.ingest

import cats.effect.IO
import cats.implicits.catsSyntaxApplicativeId
import doobie.ConnectionIO
import doobie.implicits._
import doobie.postgres.implicits._
import doobie.util.transactor.Transactor
import doobie.util.transactor.Transactor.Aux
import munit.CatsEffectSuite
import org.postgresql.ds.PGSimpleDataSource

import javax.sql.DataSource

class Fs2CopyTest extends CatsEffectSuite {

  val minChunkSize = 200

  val datasource: DataSource = {
    val ds = new PGSimpleDataSource
    ds.setURL(System.getenv("DB_URL"))
    ds.setUser(System.getenv("DB_USER"))
    ds.setPassword(System.getenv("DB_PASSWORD"))
    ds
  }
  val xa: Aux[IO, DataSource] = Transactor.fromDataSource[IO](datasource, scala.concurrent.ExecutionContext.global)

  test("A stream with a Pure effect inserts items properly") {

    sql"""
        DROP TABLE IF EXISTS demo;
        CREATE TABLE demo(id BIGSERIAL PRIMARY KEY NOT NULL, data BIGINT NOT NULL);
         """.update.run
      .transact(xa)
      .unsafeRunSync()

    // A pure stream is fine - can copy many items
    val count = 10000
    val stream = fs2.Stream.emits(1 to count)

    (sql"COPY demo(data) FROM STDIN").copyIn(stream, minChunkSize).transact(xa).unsafeRunSync()

    val queryCount =
      sql"SELECT count(*) from demo".query[Int].unique.transact(xa).unsafeRunSync()

    assertEquals(queryCount, count)
  }

  test("A stream with a ConnectionIO effect copies <= than minChunkSize items") {

    sql"""
        DROP TABLE IF EXISTS demo;
        CREATE TABLE demo(id BIGSERIAL PRIMARY KEY NOT NULL, data BIGINT NOT NULL);
         """.update.run
      .transact(xa)
      .unsafeRunSync()

    // Can copy up to minChunkSize just fine with ConnectionIO
    val inputs = (1 to minChunkSize)
    val stream = fs2.Stream.emits[ConnectionIO, Int](inputs)
      .evalMap(i => (i + 2).pure[ConnectionIO])

    (sql"COPY demo(data) FROM STDIN").copyIn(stream, minChunkSize).transact(xa).unsafeRunSync()

    val queryCount =
      sql"SELECT count(*) from demo".query[Int].unique.transact(xa).unsafeRunSync()

    assertEquals(queryCount, minChunkSize)
  }

  test("A stream with a ConnectionIO effect fails to copy items with count > minChunkSize") {

    sql"""
        DROP TABLE IF EXISTS demo;
        CREATE TABLE demo(id BIGSERIAL PRIMARY KEY NOT NULL, data BIGINT NOT NULL);
         """.update.run
      .transact(xa)
      .unsafeRunSync()

    // Can't copy over minChunkSize with ConnectionIO - copy operation gets closed
    val inputs = (1 to minChunkSize + 1)
    val stream = fs2.Stream.emits[ConnectionIO, Int](inputs)
      .evalMap(i => (i + 2).pure[ConnectionIO])

    (sql"COPY demo(data) FROM STDIN").copyIn(stream, minChunkSize).transact(xa).unsafeRunSync()

    val queryCount =
      sql"SELECT count(*) from demo".query[Int].unique.transact(xa).unsafeRunSync()

    assertEquals(queryCount, minChunkSize + 1)
  }
}
@jatcwang jatcwang self-assigned this Aug 4, 2021
jatcwang added a commit that referenced this issue Dec 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants