- 
                Notifications
    You must be signed in to change notification settings 
- Fork 503
Parsing the file storage format in AutoMQ object storage
Jinlong Wang , a programmer, has been closely following cloud-native distributed infrastructure components. He is familiar with common messaging middleware architectures and core principles, and continuously participates in several message queue-related open-source projects.
As a next-generation message middleware, AutoMQ fully utilizes the cloud storage infrastructure of the cloud era to ensure high performance while greatly simplifying operational complexity. Compared to self-built Kafka clusters based on physical machines, AutoMQ significantly reduces costs. This is attributed to the extensive utilization of S3 object storage by the underlying s3stream technology. Then, let's delve deeper into the data storage mechanism of AutoMQ in object storage.
To facilitate demonstration, we have set up a demo environment on macOS using Minio + AutoMQ. We have also adjusted relevant parameters to better illustrate the overall message content.
brew install minio
mkdir minio && minio server minio
Version: RELEASE.2024-04-06T05-26-02Z (go1.22.2 darwin/amd64)
API: http://192.168.31.129:9000  http://198.18.0.1:9000  http://127.0.0.1:9000
   RootUser: minioadmin
   RootPass: minioadmin
WebUI: http://192.168.31.129:57026 http://198.18.0.1:57026 http://127.0.0.1:57026
   RootUser: minioadmin
   RootPass: minioadmin
Here, the webUI allows login to the Minio console for easy operation, and the API is the S3 API provided by Minio, with the S3 service's AK and SK displayed on the command line. The IP address 192.168.31.129 is the local IP address of my environment.
We need to log in to the webUI to create a bucket for data storage, the bucket we create here is named automq.
./automq-kafka-admin.sh generate-s3-url --s3-access-key minioadmin --s3-secret-key minioadmin --s3-region ignore-here --s3-endpoint-protocol http --s3-endpoint http://192.168.31.129:9000 --s3-data-bucket automq --s3-ops-bucket automq --s3-path-style true
####################################  S3 PRECHECK #################################
[ OK ] Write s3 object
[ OK ] Read s3 object
[ OK ] Delete s3 object
[ OK ] Write S3 object
[ OK ] Upload s3 multipart object
[ OK ] Read s3 multipart object
[ OK ] Delete s3 object
##########  S3 URL RESULT ############
Your S3 URL is:
s3://192.168.31.129:9000?s3-access-key=minioadmin&s3-secret-key=minioadmin&s3-region=ignore-here&s3-endpoint-protocol=http&s3-data-bucket=automq&s3-path-style=true&s3-ops-bucket=automq&cluster-id=5kilSYquT962mUNQ8dL7qA
############  S3 URL USAGE ##############
You can use s3url to generate start command to start AutoMQ
bin/automq-kafka-admin.sh generate-start-command \
--s3-url="s3://192.168.31.129:9000?s3-access-key=minioadmin&s3-secret-key=minioadmin&s3-region=ignore-here&s3-endpoint-protocol=http&s3-data-bucket=automq&s3-path-style=true&s3-ops-bucket=automq&cluster-id=5kilSYquT962mUNQ8dL7qA" \
--controller-list="192.168.0.1:9093;192.168.0.2:9093;192.168.0.3:9093"  \
--broker-list="192.168.0.4:9092;192.168.0.5:9092"
TIPS: Replace the controller-list and broker-list with your real ip list.
Here we adjust the start command to generate a single-node Kafka.
./automq-kafka-admin.sh generate-start-command \
--s3-url="s3://192.168.31.129:9000?s3-access-key=minioadmin&s3-secret-key=minioadmin&s3-region=ignore-here&s3-endpoint-protocol=http&s3-data-bucket=automq&s3-path-style=true&s3-ops-bucket=automq&cluster-id=5kilSYquT962mUNQ8dL7qA" \
--controller-list="192.168.31.129:9093"  \
--broker-list="192.168.31.129:9092"
Obtain the startup command
./kafka-server-start.sh --s3-url="s3://192.168.31.129:9000?s3-access-key=minioadmin&s3-secret-key=minioadmin&s3-region=ignore-here&s3-endpoint-protocol=http&s3-data-bucket=automq&s3-path-style=true&s3-ops-bucket=automq&cluster-id=5kilSYquT962mUNQ8dL7qA" --override process.roles=broker,controller --override node.id=0 --override [email protected]:9093 --override listeners=PLAINTEXT://192.168.31.129:9092,CONTROLLER://192.168.31.129:9093 --override advertised.listeners=PLAINTEXT://192.168.31.129:9092 \
--override s3.wal.upload.threshold=5242880 \
--override metadata.log.max.snapshot.interval.ms=60000 \
--override metadata.max.retention.ms=120000
For demonstration purposes, we have adjusted some parameters
| Parameter | Description | Value | 
|---|---|---|
| s3.wal.upload.threshold | How much data accumulates on Block storage before it gets uploaded to Object storage | 5242880( 5mb) | 
| metadata.log.max.snapshot.interval.ms | The interval for generating metadata snapshots in Kraft | 60000 | 
| metadata.max.retention.ms | The retention period for metadata snapshots in Kraft | 120000 | 
./kafka-topics.sh --create --topic automq-test --bootstrap-server 192.168.31.129:9092
./kafka-producer-perf-test.sh --record-size=1024 --producer-props linger.ms=0 acks=-1 bootstrap.servers=main:9092 --num-records=50000 --throughput -1 --topic automq-test
Upon successful writing here, Kraft generates a new metadata snapshot. After that, we stop the AutoMQ node.
At this point, you can see from Minio's webUI that the data has already been written to the bucket we previously created.
Thus, the preparation is complete.
It can be observed that there is no clear mapping relationship between the data names in object storage and the topics. So, how do we read the actual data from the data stored in object storage?
AutoMQ employs the latest metadata management architecture based on Kafka's Kraft mode, which eliminates the need for maintaining a separate Zookeeper cluster for managing the entire Kafka cluster. This high-performance metadata architecture significantly enhances the scalability of the Kafka cluster. AutoMQ saves the mapping of data in object storage to actual topics within the Kraft mode metadata. Each interaction with object storage is logged by the metadata service, and this mapping information is propagated to every Broker node through the metadata replication stream between the Kafka Controller and Brokers.
Here, we use the Kraft metadata parsing tool to examine the overall mapping relationship.
Kraft metadata regularly generates snapshots of cluster metadata, which can be found in this directory: /tmp/kraft-combined-logs/__cluster_metadata-0
Files like 000000000000000xxxxxx-00000000xxx.checkpoint are snapshots generated by Kraft.
./kafka-metadata-shell.sh -s /tmp/kraft-combined-logs/__cluster_metadata-0/000000000000000xxxxxx-00000000xxx.checkpoint
We can access the entire cluster's metadata information just like using the Zookeeper command line
In the topics directory, we can confirm that the topicid for the topic 'automq-test' is LeokTjQSRYOjo9Mx0AgopQ
In the automq directory, there are 4 subdirectories:
kv: used to store kv type metadata
nodes: used to store broker information + the StreamSetObject stored on the broker
objects: used to store metadata information about objects in object storage
streams: used to store metadata information about stream flows
Specifically, the kv node actually records the streamId of the MetaStream corresponding to this topic, which is 3.
What is MetaStream?
AutoMQ maps the metadata of a topic into a MetaStream, which is different from a stream that stores data logs.
MetaStream records metadata information mapped from the data logs of the topic, snapshots of leaderEpoch, snapshots of producerSnapShot, and information related to the overall data range of the Topic partitions.
Reviewing the content of streams/3, there is no StreamObject here, indicating that the information in this stream is contained within the StreamSetObject.
We examine the information in the StreamSetObject within the nodes node.
It can be observed that objects 68 and 78 store a portion of the data from stream with streamId=3.
AutoMQ tracks data objects in object storage using objectId, and encodes them based on objectId during upload. The object corresponding to 78 has a key on S3 of e4000000/_kafka_HevPZiuuSiiyUU6ylL3C6Q/78, which is a 5MB object containing parts of streams 3, 4, and 5.
Data files in object storage are divided into DataBlock, IndexBlock, and Footer, which store the actual data block, index block, and file metadata Footer information respectively.
The file metadata Footer is a fixed 48-byte data block, containing information about the location and size of the index block, allowing for rapid location of index block data.
The IndexBlock is a fixed 36-byte data block composed of a collection of data items, the number of which depends on the total number of DataBlocks in the file. Each DatablockIndex contains data positioning information (streamId, startOffset, endOffset). Information within each DataIndexBlock (position, blockSize) allows for the positioning of any DataBlock in the file.
DataBlocks store the actual written data; if the stream is a datastream carrier, StreamRecordbatch corresponds to each Kafka-written RecordBatch. If the stream is a MetaStream, it stores key-value pair information related to Kafka topic metadata.
Each user's written data is encapsulated into a StreamRecordBatch, which is serialized and stored in the WAL.
For a detailed process, you can read the article on AutoMQ's official WeChat account: "How AutoMQ achieves high-performance WAL based on bare devices" to understand the underlying principles
After data is written to the WAL, the data in the WAL is also cached in memory in the LogCache.
When enough data accumulates, it triggers the process to upload to object storage; data is directly retrieved from the LogCache for upload, minimizing read I/O on the WAL.
The StreamRecordBatch stored in the LogCache is sorted by (streamId, startOffset). This batch of uploaded data writes to the same object in object storage in the order of (streamId, startOffset), assuming that the accumulated data in each stream does not exceed the threshold.
Once the DataBlock is fully written and encoded, an IndexBlock is constructed based on previously written information. The position of each DataBlock within the object is already determined, and this information is used to generate the DataBlockIndex for each DataBlock, with the number of DataBlockIndexes depending on the number of DataBlocks previously written.
Thus, the starting position and the length of the IndexBlock segment of the object are determined, and finally, the Footer metadata block records information related to the IndexBlock's data location and segment size, ensuring that all data in this upload batch is saved within a single object in the Object storage.
Usually, the need to read data involves quickly locating the data at the (streamId, offset) position. So, how can one quickly locate it?
From the Footer, one can obtain the position of the IndexBlock,
Data in the IndexBlock is sorted by (streamId, startOffset), enabling quick location of the actual DataBlock through binary search.
By simply traversing all the StreamRecordBatches in the DataBlock and comparing the actual offset with the baseOffset of the StreamRecordBatch, one can swiftly locate the required data.
The number of StreamRecordBatches in a DataBlock can impact the time it takes to retrieve data at a specified offset, hence during the upload, all data from the same stream are split into 1MB segments, ensuring that the number of StreamRecordBatches in each DataBlock does not adversely affect the retrieval time of data at a specific offset.
Every time a WAL data upload is triggered, it essentially involves uploading data from multiple streams into a single object (provided it doesn't exceed the threshold).
Let's assume a single Kafka Broker is responsible for 1000 topic partitions, where the MetaStream and data stream each occupy one stream. Therefore, the maximum number of streams in a single batch upload could reach 2000. If all these 2000 streams are uploaded to a single object, compared to individual uploads, the API call to the S3 Object Storage is magnified by 2000 times, resulting in a significant increase in the overall cost of API calls. AutoMQ significantly reduces the cost of Object Storage API calls by efficiently indexing blocks and merging them into a single object, while also ensuring efficient data retrieval.
This way, we understand how to parse the format of files in Object Storage, let us attempt to parse object number 78.
From this object file, we can see the index positions and sizes, along with details of specific DataBlocks
indexStartPosition: 5242006
indexBlockLength: 252
streamId=3, startOffset=4, endOffset=6, recordCount=2, startPosition=0, size=262
streamId=4, startOffset=50390, endOffset=51410, recordCount=68, startPosition=262, size=1060062
streamId=4, startOffset=51410, endOffset=52430, recordCount=68, startPosition=1060324, size=1060062
streamId=4, startOffset=52430, endOffset=53450, recordCount=68, startPosition=2120386, size=1060062
streamId=4, startOffset=53450, endOffset=54470, recordCount=68, startPosition=3180448, size=1060062
streamId=4, startOffset=54470, endOffset=55430, recordCount=64, startPosition=4240510, size=997706
streamId=5, startOffset=8376, endOffset=9384, recordCount=84, startPosition=5238216, size=3790
objectId=78, ranges=[3:4-6, 4:50390-55430, 5:8376-9384, ]
Additionally, by examining the metadata of the previously mentioned StreamSetObject, it is evident that the data for streamId=4 has been divided into multiple DataBlocks.
In reality, MetaStream is recorded in the s3Stream storage layer using a Key-Value format.
Here, the Key Payload is the UTF-8 encoding of a Java string, which can be directly read.
Read the data within the range offset = [4,6) for streamId=3 according to the index of IndexBlock. The data parsing was performed directly based on the definition by AutoMQ.
===========
StreamRecordBatch{streamId=3, epoch=0, baseOffset=4, count=1, size=138}
key=PRODUCER_SNAPSHOTS
offset=50555, content=[
ProducerStateEntry(producerId=7001, producerEpoch=0, currentTxnFirstOffset=OptionalLong.empty, coordinatorEpoch=-1, lastTimestamp=1712451139190, batchMetadata=[BatchMetadata(firstSeq=49935, lastSeq=49999, firstOffset=49935, lastOffset=49999, timestamp=1712451139190)]), 
ProducerStateEntry(producerId=7002, producerEpoch=0, currentTxnFirstOffset=OptionalLong.empty, coordinatorEpoch=-1, lastTimestamp=1712451146293, batchMetadata=[BatchMetadata(firstSeq=480, lastSeq=554, firstOffset=50480, lastOffset=50554, timestamp=1712451146293)])
]
===========
StreamRecordBatch{streamId=3, epoch=0, baseOffset=5, count=1, size=48}
key=PARTITION, value={"s":0,"c":0,"r":50540,"cs":false}
It can be seen that the snapshot information related to the Kafka Producer is stored at offset=4, which records the producer's idempotence and transaction-related metadata. At offset=5, the metadata of the entire Partition is stored, where both startOffset and cleanerOffset are 0, and recoverOffset is 50540. (Corresponding to the concept of single partition data maintenance in Kafka) The current snapshot did not stop the broker, hence, the status of cleanshutdown is false; normally, this status would be true if the shutdown was clean.
Now, the question arises that we do not have the stream id information that maps to the data flow. It is speculated that it might be stored within the range offset=[0,4) for streamId=3. Upon examining the metadata of the StreamSetObject, it is found that this information resides in object 68.
Parsing this object directly
===========
StreamRecordBatch{streamId=3, epoch=0, baseOffset=0, count=1, size=44}
key=PARTITION, value={"s":0,"c":0,"r":0,"cs":false}
===========
StreamRecordBatch{streamId=3, epoch=0, baseOffset=1, count=1, size=206}
key=LOG, value={"streamMap":{"log":4,"tim":5,"txn":-1},"segmentMetas":[{"bo":0,"ct":1712450996411,"lmt":0,"s":"","lsz":0,"ls":{"s":0,"e":-1},"ts":{"s":0,"e":-1},"txs":{"s":0,"e":-1},"fbt":0,"tle":{"t":-1,"o":0}}]}
===========
StreamRecordBatch{streamId=3, epoch=0, baseOffset=2, count=1, size=36}
key=LEADER_EPOCH_CHECKPOINT, value=ElasticLeaderEpochCheckpointMeta{version=0, entries=[]}
===========
StreamRecordBatch{streamId=3, epoch=0, baseOffset=3, count=1, size=48}
key=LEADER_EPOCH_CHECKPOINT, value=ElasticLeaderEpochCheckpointMeta{version=0, entries=[EpochEntry(epoch=0, startOffset=0)]}
From the object with the Key as LOG, it records the streamMap, which pertains to the related stream information for the data flow.
{"streamMap":{"log":4,"tim":5,"txn":-1},"segmentMetas":[{"bo":0,"ct":1712450996411,"lmt":0,"s":"","lsz":0,"ls":{"s":0,"e":-1},"ts":{"s":0,"e":-1},"txs":{"s":0,"e":-1},"fbt":0,"tle":{"t":-1,"o":0}}]}
The corresponding stream for the data flow is 4.
Thus, we have successfully parsed the information in the entire MetaStream metadata stream.
Based on our previous analysis, we only need to parse the data files where stream equals 4 to determine how Kafka's data is stored.
Here, we know that one StreamRecordBatch corresponds to one RecordBatch in Kafka; thus, after obtaining a DataBlock, we attempt to parse it directly following Kafka's data format.
Acquiring object 78, and attempting to parse it using Kafka V2's data format.
===========
StreamRecordBatch{streamId=4, epoch=0, baseOffset=55400, count=15, size=15556}
checksum=4164202497, baseOffset=55400, maxTimestamp=1712451146370, timestampType=CREATE_TIME, baseOffset=55400, lastOffset=55414, nextOffset=55415, magic=2, producerId=7002, producerEpoch=0, baseSequence=5400, lastSequence=5414, compressionType=NONE, sizeInBytes=15556, partitionLeaderEpoch=0, isControlBatch=false, isTransactional=false
===========
StreamRecordBatch{streamId=4, epoch=0, baseOffset=55415, count=15, size=15556}
checksum=1825494209, baseOffset=55415, maxTimestamp=1712451146370, timestampType=CREATE_TIME, baseOffset=55415, lastOffset=55429, nextOffset=55430, magic=2, producerId=7002, producerEpoch=0, baseSequence=5415, lastSequence=5429, compressionType=NONE, sizeInBytes=15556, partitionLeaderEpoch=0, isControlBatch=false, isTransactional=false
As we can see, the data parsing was successful! With this, we have completed the overall analysis of AutoMQ's data format.
- 
AutoMQ Configuration Documentation: https://www.automq.com/docs/automq/deployment/configuration 
- 
Technical Analysis: How AutoMQ achieves high performance WAL using raw devices: https://mp.weixin.qq.com/s/rPBOFyVXbmauj-Yjy-rkbg 
- 
Kafka RecordBatch Message Format Documentation: https://kafka.apache.org/documentation#recordbatch 
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
- Architecture: Overview
- S3stream shared streaming storage
- Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
- Data analysis
- Object storage
- Kafka ui
- Observability
- Data integration







