If you want to change the topic name to which a source connector writes, or object name that’s created on a target by a sink connector, the RegExRouter
is exactly what you need.
To use the Single Message Transform you specify the pattern in the topic name to match, and its replacement. To drop a prefix of test-
from a topic you would use:
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "test-(.*)",
"transforms.dropTopicPrefix.replacement" : "$1"
Changing the topic name to which a source connector writes #
Source connectors will stream data to a Kafka topic based on properties define in the particular connector. For example, the JDBC source connector uses the table name and prefixes it with the mandatory value configured in topic.prefix
. Other connectors will use the name of the source message queue being read from, the source file, etc etc.
Often, you’ll want to route data to a topic name that matches conventions that you have in your organisation for topic naming. Here’s an example of a JDBC source connector, and we want to drop the prefix that it uses:
{
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topic.prefix" : "mysql-02-",
"poll.interval.ms" : 1000,
"tasks.max" : 1,
"table.whitelist" : "customers",
"mode" : "incrementing",
"incrementing.column.name" : "id",
"validate.non.null" : false,
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "mysql-02-(.*)",
"transforms.dropTopicPrefix.replacement": "$1"
}
This is using RegEx to match the prefix mysql-02-
and to store everything else .*
in a capture group (
)
, which is then referenced in the replacement $1
.
To learn more about RegEx, and experiment with patterns, check out the excellent RegExr.com
Changing the object name to which a sink connector writes #
Many sink connectors will use the topic name as the basis for the naming of the target object that it populates. The JDBC Sink connector creates a table named after the topic. The Elasticsearch sink connector creates an index named after the topic. And so on.
You can use the RegExRouter
to customise the name of the object that sink connectors that follow this pattern will write to.
Here’s an example of streaming data to MySQL, using the JDBC sink connector. (See also 🎥 Kafka Connect in Action : JDBC Sink (👾 demo code
) and 🎥 ksqlDB & Kafka Connect JDBC Sink in action (👾 demo code
)
We’re going to read data from a topic called day4-transactions
:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day4-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
This works; you get a table created in MySQL:
mysql> show tables;
+-------------------+
| Tables_in_demo |
+-------------------+
| day4-transactions |
+-------------------+
1 row in set (0.00 sec)
What data’s in the table?
mysql> select * from day4-transactions;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '-transactions' at line 1
Turns out a hyphen in the table name does not make your life easy in MySQL. You can quote is with a backtick, but it is not ideal
mysql> select * from `day4-transactions` LIMIT 1;
+-----------+-------+---------------------------+
| card_type | cost | item |
+-----------+-------+---------------------------+
| switch | 98.77 | Westmalle Trappist Tripel |
+-----------+-------+---------------------------+
1 row in set (0.00 sec)
By default the JDBC Sink connector takes the topic name as the name of the table to create. Let’s modify the above connector to route data to a table called transactions
instead, and drop the day4-
prefix.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day4-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "day4-(.*)",
"transforms.dropTopicPrefix.replacement" : "$1"
}'
Since we’ve PUT
the above configuration it updates the existing connector, and now we have a table in MySQL without the day4-
prefix that’s much easier to work with:
mysql> show tables;
+-------------------+
| Tables_in_demo |
+-------------------+
| day4-transactions |
| transactions |
+-------------------+
2 rows in set (0.00 sec)
mysql> select * from transactions limit 1;
+-----------+-------+-----------------+
| card_type | cost | item |
+-----------+-------+-----------------+
| dankort | 27.12 | Sapporo Premium |
+-----------+-------+-----------------+
1 row in set (0.00 sec)