diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index 4a836822e1..d18c40028c 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -266,7 +266,7 @@ private Map getValidMetaMap() { metaMap.put(key, ElasticLeaderEpochCheckpointMeta.decode(value.value())); break; default: - LOGGER.error("{} streamId {}: unknown meta key: {}", logIdent, streamId(), key); + metaMap.put(key, value.value().duplicate()); } }); return metaMap; diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index f4093818dd..0e307b16a8 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -24,6 +24,7 @@ import kafka.controller.KafkaController import kafka.coordinator.transaction.TransactionCoordinator import kafka.utils.Logging import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.InvalidTopicException import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} @@ -246,16 +247,24 @@ class DefaultAutoTopicCreationManager( txnCoordinator.transactionTopicConfigs)) // AutoMQ inject start - case "__automq_table_control" => + case "__automq_table_control" => { + val configs = new Properties() + configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() .setName(topic) .setNumPartitions(1) .setReplicationFactor(1) - case "__automq_table_data" => + .setConfigs(convertToTopicConfigCollections(configs)) + } + case "__automq_table_data" => { + val configs = new Properties() + configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() .setName(topic) .setNumPartitions(50) .setReplicationFactor(1) + .setConfigs(convertToTopicConfigCollections(configs)) + } // AutoMQ inject end case topicName =>