8000 fail cast with more information, incl schema in table meta · laserdisc-io/mysql-binlog-stream@e4c2432 · GitHub
[go: up one dir, main page]

Skip to content

Commit e4c2432

Browse files
committed
fail cast with more information, incl schema in table meta
1 parent 8ceddf8 commit e4c2432

File tree

11 files changed

+74
-73
lines changed

11 files changed

+74
-73
lines changed

binlog-stream-models/app/io/laserdisc/mysql/binlog/models/SchemaMetadata.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ case class SchemaMetadata(
2424
)
2525

2626
case class ColumnMetadata(name: String, dataType: String, ordinal: Int, isPk: Boolean)
27-
case class TableMetadata(name: String, columns: Map[Int, ColumnMetadata])
27+
case class TableMetadata(name: String, columns: Map[Int, ColumnMetadata], schema: String)
2828

2929
object SchemaMetadata {
3030
def getMetadata(schema: String) =
@@ -51,9 +51,9 @@ object SchemaMetadata {
5151
def buildSchemaMetadata[F[_]](
5252
schema: String
5353
)(implicit xa: Transactor[F], ev: MonadCancel[F, Throwable]): F[SchemaMetadata] =
54-
getMetadata(schema).to[List].map(metaToSchema).transact(xa)
54+
getMetadata(schema).to[List].map(tblMeta => metaToSchema(schema, tblMeta)).transact(xa)
5555

56-
def metaToSchema(metadata: List[Metadata]): SchemaMetadata = {
56+
def metaToSchema(schema: String, metadata: List[Metadata]): SchemaMetadata = {
5757
val t 10000 ables = metadata
5858
.groupBy(m => m.table_name)
5959
.map { case (tableName, tableInfo) =>
@@ -69,7 +69,8 @@ object SchemaMetadata {
6969
tableInfo.head.table_name,
7070
columns.groupBy(_.ordinal).map { case (ord, columns) =>
7171
ord -> columns.head
72-
}
72+
},
73+
schema
7374
)
7475
}
7576
SchemaMetadata(tables, mutable.Map.empty)

binlog-stream-models/test/io/laserdisc/mysql/binlog/models/SchemaMetadataTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package io.laserdisc.mysql.binlog.models
22

33
import cats.effect.IO
44
import com.dimafeng.testcontainers.ForAllTestContainer
5-
import db.MySqlContainer
5+
import db.MySqlContainerTest
66
import doobie.util.transactor.Transactor
77
import doobie.util.transactor.Transactor.Aux
88
import org.scalatest.matchers.should.Matchers
99
import org.scalatest.wordspec.AnyWordSpec
1010
import cats.effect.unsafe.implicits.global
1111

12-
class SchemaMetadataTest extends AnyWordSpec with ForAllTestContainer with MySqlContainer with Matchers {
12+
class SchemaMetadataTest extends AnyWordSpec with ForAllTestContainer with MySqlContainerTest with Matchers {
1313

1414
"Schema Metadata" should {
1515

binlog-stream-models/test_resources/init.sql

Lines changed: 0 additions & 13 deletions
This file was deleted.

binlog-stream/app/io/laserdisc/mysql/binlog/stream/TransactionState.scala

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.typelevel.log4cats.Logger
1414

1515
import java.math.BigDecimal
1616
import scala.collection.immutable.Queue
17+
import scala.reflect.ClassTag
1718

1819
case class TransactionState(
1920
transactionEvents: Queue[EventMessage],
@@ -323,15 +324,15 @@ object TransactionState {
323324
}
324325

325326
def extractPk(
326-
metadata: TableMetadata,
327+
tableMeta: TableMetadata,
327328
columns: Array[Int],
328329
row: Array[Option[Serializable]]
329330
): Array[(String, Json)] =
330331
columns
331-
.map(i => metadata.columns(i + 1))
332+
.map(i => tableMeta.columns(i + 1))
332333
.zip(row)
333334
.filter { case (meta, _) => meta.isPk }
334-
.map(mapRawToMeta)
335+
.map(mapRawToMeta(tableMeta))
335336

336337
def toTableName(tableId: Long)(implicit transactionState: TransactionState): Option[String] =
337338
transactionState.schemaMetadata.idToTable.get(tableId).map(_.name)
@@ -344,22 +345,37 @@ object TransactionState {
344345
includedColumns
345346
.map(i => tableMetadata.columns(i + 1))
346347
.zip(record)
347-
.map(mapRawToMeta)
348-
349-
def mapRawToMeta: ((ColumnMetadata, Option[Serializable])) => (String, Json) = {
350-
case (metadata, Some(value)) =>
351-
val jsonValue = metadata.dataType match {
352-
case "bigint" => Json.fromLong(value.asInstanceOf[Long])
353-
case "int" | "tinyint" => Json.fromInt(value.asInstanceOf[Int])
354-
case "date" | "datetime" | "time" => Json.fromLong(value.asInstanceOf[Long])
355-
case "decimal" => Json.fromBigDecimal(value.asInstanceOf[BigDecimal])
356-
case "float" => Json.fromFloat(value.asInstanceOf[Float]).get
348+
.map(mapRawToMeta(tableMetadata))
349+
350+
def mapRawToMeta(table: TableMetadata): ((ColumnMetadata, Option[Serializable])) => (String, Json) = {
351+
case (column, Some(value)) =>
352+
def unsafeCast[T](implicit m: ClassTag[T]): T =
353+
try
354+
value match {
355+
case _: T => value.asInstanceOf[T]
356+
case _ => m.runtimeClass.cast(value).asInstanceOf[T]
357+
}
358+
catch {
359+
case e: ClassCastException =>
360+
throw new IllegalStateException(
361+
s"""Unable to process column [schema:${table.schema}, table:${table.name}, column:${column.name}]. Failed to convert a mysql '${column.dataType}' to a ${m.runtimeClass.getCanonicalName}, but got: ${e.getMessage}""",
362+
e
363+
)
364+
}
365+
366+
val jsonValue = column.dataType match {
367+
case "bigint" => Json.fromLong(unsafeCast[Long])
368+
case "int" | "tinyint" => Json.fromInt(unsafeCast[Int])
369+
case "date" | "datetime" | "time" => Json.fromLong(unsafeCast[Long])
370+
case "decimal" => Json.fromBigDecimal(unsafeCast[BigDecimal])
371+
case "float" => Json.fromFloat(unsafeCast[Float]).get
357372
case "text" | "mediumtext" | "longtext" | "tinytext" | "varchar" | "char" =>
358-
Json.fromString(new String(value.asInstanceOf[Array[Byte]]))
359-
case "json" => Json.fromString(JsonBinary.parseAsString(value.asInstanceOf[Array[Byte]]))
373+
Json.fromString(new String(unsafeCast[Array[Byte]]))
374+
case "json" => Json.fromString(JsonBinary.parseAsString(unsafeCast[Array[Byte]]))
360375
case _ => Json.fromString(value.toString)
361376
}
362-
metadata.name -> jsonValue
377+
378+
column.name -> jsonValue
363379
case (metadata, _) => metadata.name -> Json.Null
364380
}
365381

binlog-stream/test/io/laserdisc/mysql/binlog/stream/MysqlBinlogStreamTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import cats.effect.{IO, Resource}
55
import com.dimafeng.testcontainers.ForAllTestContainer
66
import com.github.shyiko.mysql.binlog.BinaryLogClient
77
import com.github.shyiko.mysql.binlog.event.{EventHeaderV4, EventType}
8-
import db.MySqlContainer
8+< 8000 /span>
import db.MySqlContainerTest
99
import doobie.hikari.HikariTransactor
1010
import doobie.implicits._
1111
import io.laserdisc.mysql.binlog.database
@@ -16,7 +16,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
1616

1717
import scala.language.reflectiveCalls
1818

19-
class MysqlBinlogStreamTest extends AnyWordSpec with ForAllTestContainer with MySqlContainer with Matchers {
19+
class MysqlBinlogStreamTest extends AnyWordSpec with ForAllTestContainer with MySqlContainerTest with Matchers {
2020

2121
def fixture =
2222
new {

binlog-stream/test/io/laserdisc/mysql/binlog/stream/PipesTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import cats.effect.{IO, Resource}
55
import cats.implicits._
66
import com.dimafeng.testcontainers.ForAllTestContainer
77
import com.github.shyiko.mysql.binlog.BinaryLogClient
8-
import db.MySqlContainer
8+
import db.MySqlContainerTest
99
import doobie.hikari.HikariTransactor
1010
import doobie.implicits._
1111
import org.typelevel.log4cats.Logger
@@ -18,7 +18,7 @@ import org.scalatest.wordspec.AnyWordSpec
1818
import scala.language.reflectiveCalls
1919
import scala.util.control
2020

21-
class PipesTest extends AnyWordSpec with Matchers with ForAllTestContainer with MySqlContainer {
21+
class PipesTest extends AnyWordSpec with Matchers with ForAllTestContainer with MySqlContainerTest {
2222

2323
def fixture =
2424
new {

binlog-stream/test/io/laserdisc/mysql/binlog/stream/TransactionStateTest.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,15 @@ class TransactionStateTest extends AnyWordSpec with Matchers with OptionValues {
9999
}
100100
}
101101
"accumulate events within transaction" in {
102+
102103
val skuMeta =
103104
models.TableMetadata(
104105
"sku",
105106
Map(
106107
1 -> models.ColumnMetadata("id", "int", 1, isPk = true),
107108
2 -> models.ColumnMetadata("sku", "varchar", 2, isPk = false)
108-
)
109+
),
110+
schema = "testSchema"
109111
)
110112
val schemaMeta =
111113
models
@@ -169,7 +171,7 @@ class TransactionStateTest extends AnyWordSpec with Matchers with OptionValues {
169171
ordinal -> models.ColumnMetadata(colName, mType, ordinal, isPk = colName == "intCol")
170172
}.toMap
171173

172-
val tableMeta = models.TableMetadata(TableName, columnMeta)
174+
val tableMeta = models.TableMetadata(TableName, columnMeta, "testSchema")
173175

174176
val schemaMeta = models.SchemaMetadata(
175177
tables = Map("all_types_table" -> tableMeta),

binlog-stream/test_resources/init.sql

Lines changed: 0 additions & 13 deletions
This file was deleted.

mysql-binlog-stream-examples/init/init.sql

Lines changed: 0 additions & 15 deletions
This file was deleted.

mysql-binlog-stream-shared/test/db/MySqlContainer.scala renamed to mysql-binlog-stream-shared/test/db/MySqlContainerTest.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,26 @@ package db
33
import com.dimafeng.testcontainers.{ForAllTestContainer, SingleContainer}
44
import io.laserdisc.mysql.binlog.config.BinLogConfig
55
import org.testcontainers.containers.MySQLContainer
6+
import org.testcontainers.utility.DockerImageName
67

78
import java.net.URI
89
import scala.concurrent.ExecutionContext
910
import scala.concurrent.ExecutionContext.Implicits
11+
import scala.jdk.CollectionConverters.MapHasAsJava
1012
import scala.language.existentials
1113

12-
trait MySqlContainer {
14+
trait MySqlContainerTest {
1315
this: ForAllTestContainer =>
1416
type OTCContainer = MySQLContainer[T] forSome {
1517
type T <: MySQLContainer[T]
1618
}
17-
val mySqlContainer: OTCContainer = new MySQLContainer("mysql:5.7").withUsername("root")
18-
mySqlContainer.withCommand("mysqld --log-bin --server-id=1 --binlog-format=ROW")
19-
mySqlContainer.withInitScript("init.sql")
19+
20+
val mySqlContainer: OTCContainer = new MySQLContainer(DockerImageName.parse("mysql:5.7"))
21+
mySqlContainer.withCommand("mysqld --log-bin --server-id=1 --binlog-format=ROW --explicit_defaults_for_timestamp=1")
22+
mySqlContainer.withTmpFs(Map("/var/lib/mysql" -> "rw").asJava)
23+
mySqlContainer.withUsername("root")
2024
mySqlContainer.withPassword("")
25+
mySqlContainer.withInitScript("init.sql")
2126

2227
override val container: SingleContainer[MySQLContainer[_]] =
2328
new SingleContainer[MySQLContainer[_]] {

0 commit comments

Comments
 (0)
0