10000 serverid · laserdisc-io/mysql-binlog-stream@8bb6622 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8bb6622

Browse files
committed
serverid
1 parent 2b7ede9 commit 8bb6622

File tree

13 files changed

+49
-84
lines changed

13 files changed

+49
-84
lines changed

binlog-stream-models/test_resources/logback-test.xml

Lines changed: 0 additions & 25 deletions
This file was deleted.

binlog-stream/app/io/laserdisc/mysql/binlog/config/package.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer
55
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.CompatibilityMode
66
import com.github.shyiko.mysql.binlog.network.SSLMode
77
import io.laserdisc.mysql.binlog.checkpoint.BinlogOffset
8+
import org.slf4j.LoggerFactory
89

910
package object config {
1011

12+
val logger = LoggerFactory.getLogger("BinLogConfigOps")
13+
1114
implicit class BinLogConfigOps(val v: BinLogConfig) extends AnyVal {
1215

1316
def mkBinaryLogClient(offset: Option[BinlogOffset] = None): BinaryLogClient = {
@@ -25,6 +28,15 @@ package object config {
2528

2629
blc.setSSLMode(if (v.useSSL) SSLMode.VERIFY_IDENTITY else SSLMode.DISABLED)
2730

31+
// ServerID should be set, (see mysql-binlog-connector-java / BinaryLogClient.setServerId for documentation)
32+
v.serverId match {
33+
case Some(sid) => blc.setServerId(sid)
34+
case None =>
35+
logger.warn(
36+
s"ServerID is not provided, so ${blc.getServerId} will be the default. This will cause issues if running multiple binlog services with this value!"
37+
)
38+
}
39+
2840
offset match {
2941
case Some(o) =>
3042
blc.setBinlogFilename(o.fileName)

binlog-stream/app/io/laserdisc/mysql/binlog/stream/MysqlBinlogStream.scala

-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ object MysqlBinlogStream {
4848
client: BinaryLogClient
4949
): Stream[F, Event] =
5050
for {
51-
d <- Stream.resource(Dispatcher[F])
51+
d <- Stream.resource(Dispatcher.parallel[F])
5252
q <- Stream.eval(Queue.bounded[F, Option[Event]](10000))
5353
proc = new MysSqlBinlogEventProcessor[F](client, q, d)
5454
/* some difficulties here during the cats3 migration. Basically, we would have used:

binlog-stream/test_resources/logback-test.xml

Lines changed: 0 additions & 26 deletions
This file was deleted.

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import sbt.Keys.scalaSource
33
organization := "io.laserdisc"
44
name := " 8000 mysql-binlog-stream"
55

6-
ThisBuild / scalaVersion := "2.13.10"
6+
ThisBuild / scalaVersion := "2.13.12"
77

88
lazy val commonSettings = Seq(
99
organization := "io.laserdisc",

mysql-binlog-stream-examples/app/main/BinLogListener.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ object BinLogListener extends IOApp {
2424
env("DB_PASSWORD"),
2525
env("DB_URL").option,
2626
env("DB_SCHEMA"),
27-
env("USE_SSL").as[Boolean]
28-
).parMapN { case (host, port, user, password, url, schema, useSSL) =>
27+
env("USE_SSL").as[Boolean],
28+
env("SERVER_ID").as[Int]
29+
).parMapN { case (host, port, user, password, url, schema, useSSL, sid) =>
2930
BinLogConfig(
3031
host,
3132
port,
@@ -34,7 +35,8 @@ object BinLogListener extends IOApp {
3435
schema,
3536
poolSize = 1,
3637
useSSL = useSSL,
37-
urlOverride = url
38+
urlOverride = url,
39+
serverId = Some(sid)
3840
)
3941
}.load[IO]
4042

mysql-binlog-stream-shared/app/io/laserdisc/mysql/binlog/config/BinLogConfig.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ case class BinLogConfig(
99
useSSL: Boolean = true,
1010
driverClass: String = "com.mysql.cj.jdbc.Driver",
1111
urlOverride: Option[String] = None,
12-
poolSize: Int
12+
poolSize: Int,
13+
serverId: Option[Int] = None
1314
) {
1415
def connectionURL: String =
1516
urlOverride.getOrElse(s"jdbc:mysql://$host:$port/$schema${if (useSSL) "?useSSL=true" else ""}")
1617

1718
override def toString: String =
18-
s"BinLogConfig(host=$host,port=$port,user=$user,password=**redacted**,schema=$schema,useSSL=$useSSL,driverClass=$driverClass,urlOverride=$urlOverride,poolSize=$poolSize)"
19+
s"BinLogConfig(host=$host,port=$port,user=$user,password=**redacted**,schema=$schema,useSSL=$useSSL,driverClass=$driverClass,urlOverride=$urlOverride,poolSize=$poolSize,serverId=$serverId)"
1920
}

mysql-binlog-stream-shared/test/db/MySqlContainer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ trait MySqlContainer {
3838
mySqlContainer.getPassword,
3939
mySqlContainer.getDatabaseName,
4040
useSSL = false,
41-
poolSize = 3
41+
poolSize = 3,
42+
serverId = Some(1234)
4243
)
4344
}
4445
}

mysql-binlog-stream-shared/test/io/laserdisc/mysql/binlog/config/BinLogConfigSpec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ class BinLogConfigSpec extends AnyWordSpec with Matchers {
1616
useSSL = true,
1717
driverClass = "com.made.up.TestDriver",
1818
urlOverride = None,
19-
poolSize = 3
19+
poolSize = 3,
20+
serverId = Some(222)
2021
)
2122

2223
"build correct connection URL" in {

mysql-binlog-stream-shared/test_resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<appender-ref ref="STDOUT" />
1111
</appender>
1212

13-
<root level="INFO">
13+
<root level="WARN">
1414
<appender-ref ref="ASYNCSTDOUT" />
1515
</root>
1616

0 commit comments

Comments
 (0)
0