8000 Merge branch 'master' into master · laserdisc-io/mysql-binlog-stream@91c7109 · GitHub
[go: up one dir, main page]

8000
Skip to content

Commit 91c7109

Browse files
authored
Merge branch 'master' into master
2 parents 45813db + e08db11 commit 91c7109

File tree

15 files changed

+113
-115
lines changed

15 files changed

+113
-115
lines changed

binlog-stream-models/test/io/laserdisc/mysql/binlog/models/SchemaMetadataTest.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,17 @@ class SchemaMetadataTest extends AnyWordSpec with ForAllTestContainer with MySql
1414
"Schema Metadata" should {
1515

1616
"restore schema state from DB" in {
17+
18+
val props = new java.util.Properties()
19+
props.put("user", mySqlContainer.getUsername)
20+
props.put("password", mySqlContainer.getPassword)
21+
1722
implicit val testTransactor: Aux[IO, Unit] =
1823
Transactor.fromDriverManager[IO](
1924
mySqlContainer.getDriverClassName,
2025
s"${mySqlContainer.getJdbcUrl}?useSSL=false",
21-
mySqlContainer.getUsername,
22-
mySqlContainer.getPassword
26+
props,
27+
None
2328
)
2429
val schemaState =
2530
SchemaMetadata.buildSchemaMetadata("test").unsafeRunSync()

binlog-stream-models/test_resources/logback-test.xml

Lines changed: 0 additions & 25 deletions
This file was deleted.

binlog-stream/app/io/laserdisc/mysql/binlog/config/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer
55
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.CompatibilityMode
66
import com.github.shyiko.mysql.binlog.network.SSLMode
77
import io.laserdisc.mysql.binlog.checkpoint.BinlogOffset
8+
import org.slf4j.LoggerFactory
89

910
package object config {
1011

12+
val logger = LoggerFactory.getLogger("BinLogConfigOps")
13+
1114
implicit class BinLogConfigOps(val v: BinLogConfig) extends AnyVal {
1215

1316
def mkBinaryLogClient(offset: Option[BinlogOffset] = None): BinaryLogClient = {
@@ -25,6 +28,15 @@ package object config {
2528

2629
blc.setSSLMode(if (v.useSSL) SSLMode.VERIFY_IDENTITY else SSLMode.DISABLED)
2730

31+
// ServerID should be set, (see mysql-binlog-connector-java / BinaryLogClient.setServerId for documentation)
32+
v.serverId match {
33+
case Some(sid) => blc.setServerId(sid)
34+
case None =>
35+
logger.warn(
36+
s"ServerID is not provided, so ${blc.getServerId} will be the default. This will cause issues if running multiple binlog services with this value!"
37+
)
38+
}
39+
2840
offset match {
2941
case Some(o) =>
3042
blc.setBinlogFilename(o.fileName)

binlog-stream/app/io/laserdisc/mysql/binlog/stream/MysqlBinlogStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ object MysqlBinlogStream {
4848
client: BinaryLogClient
4949
): Stream[F, Event] =
5050
for {
51-
d <- Stream.resource(Dispatcher[F])
51+
d <- Stream.resource(Dispatcher.parallel[F])
5252
q <- Stream.eval(Queue.bounded[F, Option[Event]](10000))
5353
proc = new MysSqlBinlogEventProcessor[F](client, q, d)
5454
/* some difficulties here during the cats3 migration. Basically, we would have used:

binlog-stream/test/io/laserdisc/mysql/binlog/stream/TransactionStateTest.scala

Lines changed: 50 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,28 @@ class TransactionStateTest extends AnyWordSpec with Matchers with OptionValues {
140140
}
141141

142142
"transform binlog write event into json" in {
143-
val skuMeta =
144-
models.TableMetadata(
145-
"sku",
146-
Map(
147-
1 -> models.ColumnMetadata("id", "int", 1, isPk = true),
148-
2 -> models.ColumnMetadata("sku", "varchar", 2, isPk = false)
149-
)
150-
)
151-
val schemaMeta =
152-
models
153-
.SchemaMetadata(tables = Map("sku" -> skuMeta), idToTable = mutable.Map(123L -> skuMeta))
143+
144+
val TableName = "all_types_table"
145+
146+
val allTypes =
147+
List("int", "tinyint", "bigint", "date", "datetime", "decimal", "float", "text", "tinytext", "mediumtext", "longtext", "varchar")
148+
149+
val columnMeta = allTypes.zipWithIndex.map { case (mType, idx) =>
150+
val colName = s"${mType}Col"
151+
val ordinal = idx + 1
152+
ordinal -> models.ColumnMetadata(colName, mType, ordinal, isPk = colName == "intCol")
153+
}.toMap
154+
155+
val tableMeta = models.TableMetadata(TableName, columnMeta)
156+
157+
val schemaMeta = models.SchemaMetadata(
158+
tables = Map("all_types_table" -> tableMeta),
159+
idToTable = mutable.Map(123L -> tableMeta)
160+
)
154161

155162
val json = TransactionState.convertToJson(
156-
tableMeta = schemaMeta.tables("sku"),
157-
includedColumns = Array(0, 1),
163+
tableMeta = schemaMeta.tables(TableName),
164+
includedColumns = 0.until(columnMeta.size).toArray,
158165
timestamp = 12345L,
159166
action = "create",
160167
fileName = "file.12345",
@@ -163,20 +170,41 @@ class TransactionStateTest extends AnyWordSpec with Matchers with OptionValues {
163170
None,
164171
Some(
165172
Array(
166-
Some(1.asInstanceOf[io.Serializable]),
167-
Some("sku1".getBytes.asInstanceOf[io.Serializable])
173+
Some(100.asInstanceOf[io.Serializable]), // int
174+
Some(200.asInstanceOf[io.Serializable]), // tinyint
175+
Some(Long.MaxValue.asInstanceOf[io.Serializable]), // bigint
176+
Some(1672531200000L.asInstanceOf[io.Serializable]), // date
177+
Some(1672567872000L.asInstanceOf[io.Serializable]), // datetime
178+
Some(java.math.BigDecimal.valueOf(99887766).asInstanceOf[io.Serializable]), // decimal
179+
Some(111.222f.asInstanceOf[io.Serializable]), // float
180+
Some("some text".getBytes.asInstanceOf[io.Serializable]), // text
181+
Some("some tinytext".getBytes.asInstanceOf[io.Serializable]), // tinytext
182+
Some("some mediumtext".getBytes.asInstanceOf[io.Serializable]), // mediumtext
183+
Some("some longtext".getBytes.asInstanceOf[io.Serializable]), // longtext
184+
Some("a varchar".getBytes.asInstanceOf[io.Serializable]) // varchar
168185
)
169186
)
170187
)
171188
)
172-
val _pk = root.id.int
173-
val _id = root.after.id.int
174-
val _sku = root.after.sku.string
175-
json.table should be("sku")
189+
190+
json.table should be(TableName)
176191
json.timestamp should be(12345L)
177-
_id.getOption(json.row).value should be(1)
178-
_sku.getOption(json.row).value should be("sku1")
179-
_pk.getOption(json.pk).value should be(1)
192+
193+
val after = root.after
194+
195+
after.intCol.int.getOption(json.row).value should be(100)
196+
after.tinyintCol.int.getOption(json.row).value should be(200)
197+
after.bigintCol.long.getOption(json.row).value should be(Long.MaxValue)
198+
after.dateCol.long.getOption(json.row).value should be(1672531200000L)
199+
after.datetimeCol.long.getOption(json.row).value should be(1672567872000L)
200+
after.decimalCol.bigDecimal.getOption(json.row).value should be(BigDecimal.valueOf(99887766))
201+
after.floatCol.double.getOption(json.row).value should be(111.222)
202+
after.textCol.string.getOption(json.row).value should be("some text")
203+
after.tinytextCol.string.getOption(json.row).value should be("some tinytext")
204+
after.mediumtextCol.string.getOption(json.row).value should be("some mediumtext")
205+
after.longtextCol.string.getOption(json.row).value should be("some longtext")
206+
after.varcharCol.string.getOption(json.row).value should be("a varchar")
207+
180208
}
181209

182210
"extract 'truncated table sku' from SQL" in {

binlog-stream/test_resources/logback-test.xml

Lines changed: 0 additions & 26 deletions
This file was deleted.

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import sbt.Keys.scalaSource
33
organization := "io.laserdisc"
44
name := "mysql-binlog-stream"
55

6-
ThisBuild / scalaVersion := "2.13.10"
6+
ThisBuild / scalaVersion := "2.13.12"
77

88
lazy val commonSettings = Seq(
99
organization := "io.laserdisc",

mysql-binlog-stream-examples/app/main/BinLogListener.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ object BinLogListener extends IOApp {
2424
env("DB_PASSWORD"),
2525
env("DB_URL").option,
2626
env("DB_SCHEMA"),
27-
env("USE_SSL").as[Boolean]
28-
).parMapN { case (host, port, user, password, url, schema, useSSL) =>
27+
env("USE_SSL").as[Boolean],
28+
env("SERVER_ID").as[Int]
29+
).parMapN { case (host, port, user, password, url, schema, useSSL, sid) =>
2930
BinLogConfig(
3031
host,
3132
port,
@@ -34,7 +35,8 @@ object BinLogListener extends IOApp {
3435
schema,
3536
poolSize = 1,
3637
useSSL = useSSL,
37-
urlOverride = url
38+
urlOverride = url,
39+
serverId = Some(sid)
3840
)
3941
}.load[IO]
4042

mysql-binlog-stream-shared/app/io/laserdisc/mysql/binlog/config/BinLogConfig.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ case class BinLogConfig(
99
useSSL: Boolean = true,
1010
driverClass: String = "com.mysql.cj.jdbc.Driver",
1111
urlOverride: Option[String] = None,
12-
poolSize: Int
12+
poolSize: Int,
13+
serverId: Option[Int] = None
1314
) {
1415
def connectionURL: String =
1516
urlOverride.getOrElse(s"jdbc:mysql://$host:$port/$schema${if (useSSL) "?useSSL=true" else ""}")
1617

1718
override def toString: String =
18-
s"BinLogConfig(host=$host,port=$port,user=$user,password=**redacted**,schema=$schema,useSSL=$useSSL,driverClass=$driverClass,urlOverride=$urlOverride,poolSize=$poolSize)"
19+
s"BinLogConfig(host=$host,port=$port,user=$user,password=**redacted**,schema=$schema,useSSL=$useSSL,driverClass=$driverClass,urlOverride=$urlOverride,poolSize=$poolSize,serverId=$serverId)"
1920
}

mysql-binlog-stream-shared/test/db/MySqlContainer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ trait MySqlContainer {
3838
mySqlContainer.getPassword,
3939
mySqlContainer.getDatabaseName,
4040
useSSL = false,
41-
poolSize = 3
41+
poolSize = 3,
42+
serverId = Some(1234)
4243
)
4344
}
4445
}

0 commit comments

Comments
 (0)
0