We saw in the first post how to hack together an ingestion pipeline for XML into Kafka using a source such as curl
piped through xq
to wrangle the XML and stream it into Kafka using kafkacat
, optionally using ksqlDB to apply and register a schema for it.
The second one showed the use of any Kafka Connect source connector plus the kafka-connect-transform-xml
Single Message Transformation. Now we’re going to take a look at a source connector from the community that can also be used to ingest XML data into Kafka.
FilePulse is an Apache 2.0 licensed connector written by Florian Hussonnois. It supports ingestion from flat files in lots of different formats, including XML. Florian wrote a useful blog about it here.
Ingesting XML data into Kafka with Kafka Connect and the FilePulse connector
Using a simple XML source file I first tried this, copied from based on the tutorial
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"topic":"books-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
This failed at the point at which Kafka Connect tried to serialise the root element (x:books
) to Avro
Caused by: org.apache.avro.SchemaParseException: Illegal character in: X:books
at org.apache.avro.Schema.validateName(Schema.java:1530)
at org.apache.avro.Schema.access$400(Schema.java:87)
at org.apache.avro.Schema$Name.<init>(Schema.java:673)
at org.apache.avro.Schema.createRecord(Schema.java:212)
The XML looks like this:
<?xml version="1.0"?>
<x:books xmlns:x="urn:books">
<book id="bk001">
<author>Writer</author>
<title>The First Book</title>
…
Since we don’t want that root element anyway we can use an XPath to specify which bits we do want, with the xpath.expression
configuration element.
A useful way to figure out your XPath is to run xmllint --shell <your xml file>
and navigate around the structure to figure it out. The great thing about old established technologies is that there’s a ton of resources on Google from people hitting the same problems in the past - this from 2010 helped me out in writing this! My XPath expression was simply /*/book
:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-01/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "/*/book",
"topic":"books-01",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
This worked, and we can confirm that using a consumer against the topic - here I’m using ksqlDB just cos it’s quicker:
ksql> PRINT 'books-01' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/10/02 11:26:45.222 Z, key: <null>, value: {"id": "bk001", "author": "Writer", "title": "The First Book", "genre": "Fiction", "price": "44.95", "pub_date": "2000-10-01", "review": "An amazing story of nothing."}
rowtime: 2020/10/02 11:26:45.226 Z, key: <null>, value: {"id": "bk002", "author": "Poet", "title": "The Poet's First Poem", "genre": "Poem", "price": "24.95", "pub_date": "2000-10-01", "review": "Least poetic poems."}
The value has been serialised as Avro, with the schema inferred from the XML itself. We can verify it by looking it up from the Schema Registry:
docker exec --tty schema-registry \
curl -s "http://localhost:8081/subjects/books-01-value/versions/1" | \
jq '.schema|fromjson[1]'
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{ "name": "id", "type": [ "null", "string" ], "default": null },
{ "name": "author", "type": [ "null", "string" ], "default": null },
{ "name": "title", "type": [ "null", "string" ], "default": null },
…
Avro is set as the default converter in my Kafka Connect worker configuration; I could override it if I wanted to use Protobuf, for example, by setting the necessary value.converter
configuration:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "/*/book",
"topic":"books-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1,
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
This time the data’s written as Protobuf, which we can validate also from ksqlDB (it takes a best-guess at the serialisation method when it reads the messages, and automagically picks the appropriate deserialiser):
ksql> PRINT 'books-02' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2020/10/02 11:31:34.066 Z, key: <null>, value: id: "bk001" author: "Writer" title: "The First Book" genre: "Fiction" price: "44.95" pub_date: "2000-10-01" review: "An amazing story of nothing."
rowtime: 2020/10/02 11:31:34.068 Z, key: <null>, value: id: "bk002" author: "Poet" title: "The Poet\'s First Poem" genre: "Poem" price: "24.95" pub_date: "2000-10-01" review: "Least poetic poems."
A bit of ksqlDB
With the data streaming into a Kafka topic from flat file, we can do this:
ksql> CREATE STREAM BOOKS WITH (KAFKA_TOPIC='books-02',VALUE_FORMAT='PROTOBUF');
Message
----------------
Stream created
----------------
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM BOOKS EMIT CHANGES LIMIT 2;
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|ID |AUTHOR |TITLE |GENRE |PRICE |PUB_DATE |REVIEW |
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|bk001 |Writer |The First Book |Fiction |44.95 |2000-10-01 |An amazing story of nothing |
|bk002 |Poet |The Poet's First Poem |Poem |24.95 |2000-10-01 |Least poetic poems. |
Limit Reached
Query terminated
For more permutations of XML ingest with FilePulse check out this blog.
What are my other options for getting XML into Kafka?
FilePulse worked great here, and it clearly has a lot of flexibility its processing and file handling options. It’s also really handy that it can infer the schema of the payload from the XML without requiring an XSD.
But what if your data isn’t in a flat file? Unfortunately in this situation you will need to reach for another option:
👾 Try it out!
You can find the code to run this for yourself using Docker Compose on GitHub.