Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: AutoMQ/automq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 1.3.2-rc0
Choose a base ref
...
head repository: AutoMQ/automq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref

Commits on Nov 1, 2024

  1. perf: limit the inflight requests (#2100)

    * docs: add todos
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf(network): limit the inflight requests by size
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf(ReplicaManager): limit the queue size of the `fetchExecutor`s
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf(KafkaApis): limit the queue size of async request handlers
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor(network): make "queued.max.requests.size.bytes" configurable
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * style: fix lint
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * fix(network): limit the min queued request size per queue
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 1, 2024
    Copy the full SHA
    a7250cd View commit details

Commits on Nov 3, 2024

  1. fix(issue2108): avoid blocking at the end of a compaction iteration w…

    …hen there are un-uploaded data (#2109)
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 3, 2024
    Copy the full SHA
    412a99b View commit details
  2. fix(s3stream): wait force upload complete before return (#2112)

    Signed-off-by: Shichao Nie <niesc@automq.com>
    Co-authored-by: Shichao Nie <niesc@automq.com>
    superhx and SCNieh authored Nov 3, 2024
    Copy the full SHA
    8338f18 View commit details

Commits on Nov 5, 2024

  1. fix(compaction): prevent double release on compaction shutdown (#2115)

    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 5, 2024
    Copy the full SHA
    04b53f4 View commit details
  2. fix(s3stream): fix available bandwidth metrics (#2119)

    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 5, 2024
    Copy the full SHA
    4f93b92 View commit details

Commits on Nov 6, 2024

  1. fix(checkstyle): fix checkstyle (#2122)

    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 6, 2024
    Copy the full SHA
    0443529 View commit details
  2. fix(e2e): remove unstable autobalancer tests (#2124)

    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 6, 2024
    Copy the full SHA
    e53c837 View commit details
  3. Copy the full SHA
    08ca9ec View commit details

Commits on Nov 7, 2024

  1. perf(log): avoid too many checkpoint at the same time (#2130)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 7, 2024
    Copy the full SHA
    9c51a61 View commit details
  2. perf(tools/perf): assuming all partitions have the same offset at the…

    … same time (#2127) (#2128)
    
    * feat(tools/perf): log progress on resetting offsets
    
    
    
    * fix: reset timeouts
    
    
    
    * feat: increase the log interval
    
    
    
    * perf(tools/perf): assuming all partitions have the same offset at the same time
    
    
    
    * feat: limit the min of --backlog-duration
    
    
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 7, 2024
    Copy the full SHA
    42debe7 View commit details
  3. refactor(tools/perf): retry sending messages in when waiting topics r…

    …eady (#2133)
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 7, 2024
    Copy the full SHA
    8f5fcc5 View commit details

Commits on Nov 8, 2024

  1. fix(s3stream/storage): correct if condition on awaitTermination (#2138

    )
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 8, 2024
    Copy the full SHA
    73e8901 View commit details
  2. feat(tools/perf): run benchmark without consumer (#2135)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 8, 2024
    Copy the full SHA
    cbaf3a2 View commit details
  3. fix(issue2139): prevent read object info from closed ObjectReader (#2141

    )
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 8, 2024
    Copy the full SHA
    90516c7 View commit details
  4. fix(issue2139): add computeIfAbsent atomic operation to AsyncLRUCache (

    …#2144)
    
    close #2139
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 8, 2024
    Copy the full SHA
    a74cf69 View commit details
  5. fix(issue2140): remove override equals and hashCode method for Object…

    …Reader (#2147)
    
    close #2140
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 8, 2024
    Copy the full SHA
    351e1e0 View commit details

Commits on Nov 13, 2024

  1. fix(issue2151): avoid using stale broker IPs for AutoBalancer consumer (

    #2152)
    
    close #2151
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Nov 13, 2024
    Copy the full SHA
    ab249e0 View commit details
  2. chore(github): update code owners (#2155)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 13, 2024
    Copy the full SHA
    5eea51d View commit details

Commits on Nov 14, 2024

  1. feat(quota): support to update broker request rate quota (#2158)

    * refactor(quota): refactor `maybeRecordAndGetThrottleTimeMs`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * fix(quota): throttle the produce request whatever the acks is
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor(quota): separate `Request` in `ClientQuotaManager` and `RequestRate` in `BrokerQuotaManager`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * sytle: fix lint
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat(quota): support to update broker request rate quota
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test(quota): test update quota
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 14, 2024
    Copy the full SHA
    54ea5d4 View commit details
  2. feat(quota): support broker quota for slow fetch (#2160)

    * feat(quota): introduce `SLOW_FETCH` broker quota
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat(quota): add slow fetch quota
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test(quota): test broker slow fetch quota
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test(quota): test zero quota value
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 14, 2024
    Copy the full SHA
    4597b74 View commit details

Commits on Nov 15, 2024

  1. feat(backpressure): back pressure by system load (#2161)

    * feat(backpressure): init backpressure module
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat(backpressure): implement `DefaultBackPressureManager`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test(backpressure): test `DefaultBackPressureManager`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 15, 2024
    Copy the full SHA
    bcdd7e1 View commit details
  2. refactor(backpressure): introduce interface Checker (#2162)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 15, 2024
    Copy the full SHA
    7d6c2c9 View commit details
  3. feat(quota): support to get current quota by type (#2163)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 15, 2024
    Copy the full SHA
    e2f0e95 View commit details

Commits on Nov 20, 2024

  1. feat(tools/perf): create topics in batch (#2166)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 20, 2024
    Copy the full SHA
    ff3b68e View commit details
  2. chore(backpressure): log it on back pressure (#2164)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 20, 2024
    Copy the full SHA
    3073399 View commit details
  3. feat(table): table topic aspect (#2167)

    * feat(table): table topic aspect
    
    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    
    * chore(table): fix PR review
    
    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    
    * fix(table): fix unit test
    
    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    
    ---------
    
    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Nov 20, 2024
    Copy the full SHA
    2e3d94a View commit details
  4. docs: Introducing AutoMQ Guru on Gurubase.io (#2159)

    Introducing AutoMQ Guru on Gurubase.io
    
    Signed-off-by: Kursat Aktas <kursat.ce@gmail.com>
    kursataktas authored Nov 20, 2024
    Copy the full SHA
    ef1520a View commit details

Commits on Nov 21, 2024

  1. chore(workflow): add spotless check (#2168)

    chore(workflow): spotless check
    
    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Nov 21, 2024
    Copy the full SHA
    1e217ee View commit details

Commits on Nov 22, 2024

  1. fix(stream): release FetchResults if the subsequent fetch fails (#2172

    )
    
    * fix(stream): release `FetchResult`s if the subsequent fetch fails
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * revert: "fix(stream): release `FetchResult`s if the subsequent fetch fails"
    
    This reverts commit 5836a6a.
    
    * refactor: add the `FetchResult` into the list in order rather than in reverse order
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * fix: release `FetchResult`s if failed to fetch
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 22, 2024
    Copy the full SHA
    e9bbf7a View commit details
  2. chore(stream): move asyncsemaphore to util (#2173)

    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Nov 22, 2024
    Copy the full SHA
    d7b73f0 View commit details

Commits on Nov 25, 2024

  1. feat(quota): support to get current quota metric value... (#2170)

    * fix: fix logs
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat(quota): support to get current quota metric value
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor(backpressure): remove `Regulator#minimize`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf(quota): increase the max of broker quota throttle time
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf(backpressure): decrease cooldown time
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf(quota): increase the max of broker quota throttle time
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * docs: update comments
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 25, 2024
    Copy the full SHA
    89f2c6b View commit details
  2. feat(quota): exclude internal client IDs from broker quota (#2179)

    * feat(quota): exclude internal client IDs from broker quota
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat(autobalancer): mark producers and consumers internal clients
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 25, 2024
    Copy the full SHA
    018833e View commit details

Commits on Nov 26, 2024

  1. fix(quota): limit the max throttle time (#2180)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 26, 2024
    Copy the full SHA
    bfc0e61 View commit details
  2. fix(quota): check whether the client in white list before fetch (#2181)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 26, 2024
    Copy the full SHA
    57d8e5f View commit details
  3. chore(table): set table max.message.bytes to 20MiB (#2182)

    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Nov 26, 2024
    Copy the full SHA
    64b3865 View commit details

Commits on Nov 28, 2024

  1. fix: use the "adjusted" maxSize in ElasticLogSegment#readAsync (#…

    …2184)
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 28, 2024
    Copy the full SHA
    dab96d7 View commit details
  2. fix: release PooledMemoryRecords if it's dropped in the fetch sessi…

    …on (#2185)
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Nov 28, 2024
    Copy the full SHA
    2583617 View commit details
  3. feat(table): auto create table topic control topic (#2186)

    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Nov 28, 2024
    Copy the full SHA
    fba35b6 View commit details

Commits on Nov 29, 2024

  1. fix(issues2193): retry 2 times to cover most of BlockNotContinuousExc…

    …eption (#2194)
    
    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Nov 29, 2024
    Copy the full SHA
    d6641d7 View commit details

Commits on Dec 2, 2024

  1. feat(core): reuse unregistered node when requesting for next node id (#…

    …2200)
    
    * feat(core): reuse unregistered node when requesting for next node id (#2183)
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    
    * fix(core): fix getting duplicated node id
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    
    ---------
    
    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Dec 2, 2024
    Copy the full SHA
    6217fb1 View commit details
  2. chore(bin): increase the gc log file count (#2202)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Dec 2, 2024
    Copy the full SHA
    504ce4e View commit details
  3. feat(backpressure): add metrics (#2198)

    * feat(backpressure): log it on recovery from backpressure
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat: add metric fetch_limiter_waiting_task_num
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat: add metric fetch_limiter_timeout_count
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat: add metric fetch_limiter_time
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat: add metric back_pressure_state
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat: add metric broker_quota_limit
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * fix(backpressure): run checkers with fixed delay
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * style: fix lint
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * perf: drop too large values
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor: record -1 for other states
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test: fix tests
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Dec 2, 2024
    Copy the full SHA
    b470ba4 View commit details

Commits on Dec 3, 2024

  1. feat(backpressure): support dynamic configs (#2204)

    * feat(backpressure): make back pressure manager configurable
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test: test diabled
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor: move backpressure from s3stream to kafka.core
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor: init `BackPressureManager` in `BrokerServer`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor: introduce `BackPressureConfig`
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * feat: make `BackPressureManager` reconfigurable
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * test: test reconfigurable
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor: rename config key
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    * refactor: move metric "back_pressure_state" from s3stream to core
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    
    ---------
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Dec 3, 2024
    Copy the full SHA
    52eb245 View commit details
  2. fix(core): write next node id into image (#2206)

    Signed-off-by: Shichao Nie <niesc@automq.com>
    SCNieh authored Dec 3, 2024
    Copy the full SHA
    46c0e76 View commit details
  3. feat(backpressure): stop and remove all scheduled tasks on shutdown (#…

    …2207)
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Dec 3, 2024
    Copy the full SHA
    1cdce66 View commit details
  4. fix(backpressure): start before registering to dynamic configs (#2208)

    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Dec 3, 2024
    Copy the full SHA
    eab1c07 View commit details
  5. fix(backpressure): fix metric value of back pressure state (#2209)

    fix: fix metric value of back pressure state
    
    Signed-off-by: Ning Yu <ningyu@automq.com>
    Chillax-0v0 authored Dec 3, 2024
    Copy the full SHA
    89ccf10 View commit details
  6. feat(config): add table topic conversion type configuration (#2203)

    * feat(config): add table topic conversion type configurations
    
    * feat(config): rename table topic type to schema type and update related configurations
    
    * feat(config): add table topic schema registry URL configuration and validation
    
    * test(config): add unit tests for ControllerConfigurationValidator table topic schema configuration
    
    * fix(tests): update exception type in ControllerConfigurationValidatorTableTest for schema validation
    
    * feat(config): polish code
    Gezi-lzq authored Dec 3, 2024
    Copy the full SHA
    af4dd1f View commit details

Commits on Dec 5, 2024

  1. chore(gradle): update aws version to 2.29.26 (#2210)

    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Dec 5, 2024
    Copy the full SHA
    e95f608 View commit details
  2. fix(docker): fix docker compose quick start (#2213)

    Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
    superhx authored Dec 5, 2024
    Copy the full SHA
    35522a8 View commit details
Showing with 2,870 additions and 614 deletions.
  1. +1 −1 .github/CODEOWNERS
  2. +1 −1 .github/workflows/build_automq.yml
  3. +5 −7 README.md
  4. +11 −0 automq-shell/src/main/java/com/automq/shell/commands/cluster/Deploy.java
  5. +2 −2 automq-shell/src/main/java/com/automq/shell/log/LogRecorder.java
  6. +2 −1 automq-shell/src/main/java/com/automq/shell/log/LogUploader.java
  7. +2 −1 automq-shell/src/main/java/com/automq/shell/metrics/S3MetricsExporter.java
  8. +61 −0 automq-shell/src/main/java/com/automq/shell/util/Utils.java
  9. +40 −0 automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java
  10. +2 −2 bin/kafka-run-class.sh
  11. +8 −0 build.gradle
  12. +32 −11 clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
  13. +6 −0 clients/src/main/java/org/apache/kafka/common/internals/Topic.java
  14. +9 −0 clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
  15. +4 −0 clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java
  16. +40 −22 core/src/main/java/kafka/autobalancer/LoadRetriever.java
  17. +1 −1 core/src/main/java/kafka/autobalancer/config/AutoBalancerControllerConfig.java
  18. +1 −1 core/src/main/java/kafka/autobalancer/config/AutoBalancerMetricsReporterConfig.java
  19. +2 −1 core/src/main/java/kafka/autobalancer/metricsreporter/AutoBalancerMetricsReporter.java
  20. +23 −4 core/src/main/java/kafka/automq/AutoMQConfig.java
  21. +87 −0 core/src/main/java/kafka/automq/backpressure/BackPressureConfig.java
  22. +37 −0 core/src/main/java/kafka/automq/backpressure/BackPressureManager.java
  23. +33 −0 core/src/main/java/kafka/automq/backpressure/Checker.java
  24. +171 −0 core/src/main/java/kafka/automq/backpressure/DefaultBackPressureManager.java
  25. +43 −0 core/src/main/java/kafka/automq/backpressure/LoadLevel.java
  26. +31 −0 core/src/main/java/kafka/automq/backpressure/Regulator.java
  27. +6 −2 core/src/main/java/kafka/automq/zonerouter/NoopProduceRouter.java
  28. +4 −1 core/src/main/java/kafka/automq/zonerouter/ProduceRouter.java
  29. +8 −1 core/src/main/resources/jmx/rules/broker.yaml
  30. +26 −2 core/src/main/scala/kafka/cluster/Partition.scala
  31. +21 −0 core/src/main/scala/kafka/cluster/PartitionAppendListener.java
  32. +13 −3 core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
  33. +26 −10 core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java
  34. +5 −1 core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java
  35. +28 −3 core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala
  36. +20 −0 core/src/main/scala/kafka/log/streamaspect/LogConfigChangeListener.java
  37. +6 −1 core/src/main/scala/kafka/log/streamaspect/MetaStream.java
  38. +44 −4 core/src/main/scala/kafka/network/RequestChannel.scala
  39. +4 −1 core/src/main/scala/kafka/network/SocketServer.scala
  40. +24 −1 core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
  41. +34 −2 core/src/main/scala/kafka/server/BrokerServer.scala
  42. +1 −1 core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
  43. +8 −3 core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
  44. +5 −1 core/src/main/scala/kafka/server/DelayedFetch.scala
  45. +11 −2 core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
  46. +42 −3 core/src/main/scala/kafka/server/FairLimiter.java
  47. +8 −0 core/src/main/scala/kafka/server/FetchSession.scala
  48. +9 −1 core/src/main/scala/kafka/server/KafkaApis.scala
  49. +35 −1 core/src/main/scala/kafka/server/KafkaConfig.scala
  50. +1 −1 core/src/main/scala/kafka/server/KafkaRequestHandler.scala
  51. +10 −0 core/src/main/scala/kafka/server/Limiter.java
  52. +10 −0 core/src/main/scala/kafka/server/NoopLimiter.java
  53. +22 −2 core/src/main/scala/kafka/server/QuotaFactory.scala
  54. +86 −36 core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala
  55. +27 −16 core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala
  56. +45 −10 core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala
  57. +22 −0 core/src/main/scala/kafka/server/streamaspect/PartitionLifecycleListener.java
  58. +73 −0 core/src/test/java/kafka/autobalancer/LoadRetrieverTest.java
  59. +187 −0 core/src/test/java/kafka/automq/backpressure/DefaultBackPressureManagerTest.java
  60. +46 −1 core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala
  61. +99 −6 core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java
  62. +51 −0 core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala
  63. +25 −1 core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
  64. +10 −10 docker/docker-compose.yaml
  65. +5 −1 docker/scripts/start.sh
  66. +3 −2 gradle/dependencies.gradle
  67. +41 −0 metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
  68. +102 −11 metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
  69. +4 −1 metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
  70. +1 −0 metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
  71. +9 −10 metadata/src/main/java/org/apache/kafka/controller/stream/DefaultNodeRuntimeInfoGetter.java
  72. +20 −1 metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java
  73. +3 −1 metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
  74. +40 −0 metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
  75. +58 −0 metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
  76. +1 −0 metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java
  77. +12 −7 s3stream/src/main/java/com/automq/stream/s3/CompositeObjectReader.java
  78. +12 −8 s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
  79. +1 −1 s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
  80. +7 −0 s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java
  81. +21 −0 s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java
  82. +1 −0 s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DataBlockCache.java
  83. +9 −6 s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/DefaultObjectReaderFactory.java
  84. +1 −1 s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java
  85. +32 −10 s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java
  86. +7 −4 s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java
  87. +17 −0 s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java
  88. +4 −0 s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java
  89. +43 −4 s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java
  90. +1 −1 s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java
  91. +20 −20 s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java
  92. +23 −0 s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java
  93. +22 −6 s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java
  94. +2 −2 s3stream/src/main/java/com/automq/stream/{s3/cache/blockcache → utils}/AsyncSemaphore.java
  95. +9 −1 s3stream/src/main/java/com/automq/stream/utils/CommandUtils.java
  96. +32 −0 s3stream/src/main/java/com/automq/stream/utils/ThreadUtils.java
  97. +69 −0 s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java
  98. +8 −0 server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java
  99. +8 −0 server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java
  100. +7 −0 ...-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java
  101. +83 −0 ...er-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java
  102. +41 −0 server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java
  103. +9 −0 server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java
  104. +26 −4 server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java
  105. +47 −0 storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
  106. +0 −255 tests/kafkatest/automq/autobalancer_test.py
  107. +88 −11 tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java
  108. +67 −44 tools/src/main/java/org/apache/kafka/tools/automq/perf/ConsumerService.java
  109. +50 −2 tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java
  110. +11 −12 tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java
  111. +36 −6 tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

* @superhx @SCNieh @ShadowySpirits @Chillax-0v0
* @superhx @SCNieh @Chillax-0v0 @Gezi-lzq
2 changes: 1 addition & 1 deletion .github/workflows/build_automq.yml
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ jobs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2.9.0
- name: Checkstyle
run: ./gradlew --build-cache rat checkstyleMain checkstyleTest
run: ./gradlew --build-cache rat checkstyleMain checkstyleTest spotlessJavaCheck
spotbugs:
name: "Spotbugs"
runs-on: ${{ matrix.os }}
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -24,18 +24,16 @@
[![](https://badgen.net/badge/Slack/Join%20AutoMQ/0abd59?icon=slack)](https://join.slack.com/t/automq/shared_invite/zt-29h17vye9-thf31ebIVL9oXuRdACnOIA)
[![](https://img.shields.io/badge/AutoMQ%20vs.%20Kafka(Cost)-yellow)](https://www.automq.com/blog/automq-vs-apache-kafka-a-real-aws-cloud-bill-comparison)
[![](https://img.shields.io/badge/AutoMQ%20vs.%20Kafka(Performance)-orange)](https://docs.automq.com/docs/automq-opensource/IJLQwnVROiS5cUkXfF0cuHnWnNd)
[![Gurubase](https://img.shields.io/badge/Gurubase-Ask%20AutoMQ%20Guru-006BFF)](https://gurubase.io/g/automq)

<a href="https://trendshift.io/repositories/9782" target="_blank"><img src="https://trendshift.io/api/badge/repositories/9782" alt="AutoMQ%2Fautomq | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>

---

![](https://img.shields.io/badge/AWS-%E2%9C%85-lightgray?logo=amazonaws)
![](https://img.shields.io/badge/Google-%F0%9F%9A%A7-lightyellow?logo=googlecloud)
![](https://img.shields.io/badge/Azure-%F0%9F%9A%A7-lightyellow?logo=microsoftazure)
![](https://img.shields.io/badge/Google-%E2%9C%85-lightgray?logo=googlecloud)
![](https://img.shields.io/badge/Azure-%E2%9C%85-lightgray?logo=microsoftazure)
![](https://img.shields.io/badge/Aliyun-%E2%9C%85-lightgray?logo=alibabacloud)
![](https://img.shields.io/badge/Huawei-%E2%9C%85-lightgray?logo=huawei)
![](https://img.shields.io/badge/Baidu-%E2%9C%85-lightgray?logo=baidu)
![](https://img.shields.io/badge/Tencent-%E2%9C%85-lightgray?logo=tencentqq)
</div>

## 📺 Youtube Video Introduction
@@ -187,8 +185,8 @@ There are more deployment options available:
- [Deploy on Linux with 5 Nodes](https://docs.automq.com/docs/automq-opensource/IyXrw3lHriVPdQkQLDvcPGQdnNh)
- [Deploy on Kubernetes(Enterprise Edition Only)](https://docs.automq.com/docs/automq-opensource/KJtLwvdaPi7oznkX3lkcCR7fnte)
- [Runs on Ceph / MinIO / CubeFS / HDFS](https://docs.automq.com/docs/automq-opensource/RexrwfhKuiGChfk237QcEBIwnND)
- [Try AutoMQ on Alibaba Cloud Marketplace](https://market.aliyun.com/products/55530001/cmgj00065841.html)
- [Try AutoMQ on AWS Marketplace](https://docs.automq.com/automq-cloud/getting-started/install-byoc-environment/aws/install-env-from-marketplace)
- [Try AutoMQ on Alibaba Cloud Marketplace (Two Weeks Free Trial)](https://market.aliyun.com/products/55530001/cmgj00065841.html)
- [Try AutoMQ on AWS Marketplace (Two Weeks Free Trial)](https://docs.automq.com/automq-cloud/getting-started/install-byoc-environment/aws/install-env-from-marketplace)

## 💬 Community
You can join the following groups or channels to discuss or ask questions about AutoMQ:
Original file line number Diff line number Diff line change
@@ -159,6 +159,7 @@ private static void appendCommonConfigsOverride(StringBuilder sb, ClusterTopolog
sb.append("--override cluster.id=").append(topo.getGlobal().getClusterId()).append(" ");
sb.append("--override node.id=").append(node.getNodeId()).append(" ");
sb.append("--override controller.quorum.voters=").append(getQuorumVoters(topo)).append(" ");
sb.append("--override controller.quorum.bootstrap.servers=").append(getBootstrapServers(topo)).append(" ");
sb.append("--override advertised.listeners=").append("PLAINTEXT://").append(node.getHost()).append(":9092").append(" ");
}

@@ -181,4 +182,14 @@ private static String getQuorumVoters(ClusterTopology topo) {
.map(node -> node.getNodeId() + "@" + node.getHost() + ":9093")
.collect(Collectors.joining(","));
}

private static String getBootstrapServers(ClusterTopology topo) {
List<Node> nodes = topo.getControllers();
if (!(nodes.size() == 1 || nodes.size() == 3)) {
throw new IllegalArgumentException("Only support 1 or 3 controllers");
}
return nodes.stream()
.map(node -> node.getHost() + ":9093")
.collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
@@ -39,10 +39,10 @@ public void validate() {
throw new IllegalArgumentException("Level cannot be blank");
}
if (StringUtils.isBlank(logger)) {
throw new IllegalArgumentException("Level cannot be blank");
throw new IllegalArgumentException("Logger cannot be blank");
}
if (StringUtils.isBlank(message)) {
throw new IllegalArgumentException("Level cannot be blank");
throw new IllegalArgumentException("Message cannot be blank");
}
}

Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
package com.automq.shell.log;

import com.automq.shell.AutoMQApplication;
import com.automq.shell.util.Utils;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo;
import com.automq.stream.s3.operator.ObjectStorage.ObjectPath;
@@ -204,7 +205,7 @@ private void upload(long now) {

try {
String objectKey = getObjectKey();
objectStorage.write(WriteOptions.DEFAULT, objectKey, uploadBuffer.retainedSlice().asReadOnly()).get();
objectStorage.write(WriteOptions.DEFAULT, objectKey, Utils.compress(uploadBuffer.slice().asReadOnly())).get();
break;
} catch (Exception e) {
e.printStackTrace(System.err);
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@

package com.automq.shell.metrics;

import com.automq.shell.util.Utils;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.ObjectStorage.ObjectInfo;
import com.automq.stream.s3.operator.ObjectStorage.ObjectPath;
@@ -242,7 +243,7 @@ public CompletableResultCode flush() {
synchronized (uploadBuffer) {
if (uploadBuffer.readableBytes() > 0) {
try {
objectStorage.write(WriteOptions.DEFAULT, getObjectKey(), uploadBuffer.retainedSlice().asReadOnly()).get();
objectStorage.write(WriteOptions.DEFAULT, getObjectKey(), Utils.compress(uploadBuffer.slice().asReadOnly())).get();
} catch (Exception e) {
LOGGER.error("Failed to upload metrics to s3", e);
return CompletableResultCode.ofFailure();
61 changes: 61 additions & 0 deletions automq-shell/src/main/java/com/automq/shell/util/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.shell.util;

import com.automq.stream.s3.ByteBufAlloc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import io.netty.buffer.ByteBuf;

public class Utils {

public static ByteBuf compress(ByteBuf input) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);

byte[] buffer = new byte[input.readableBytes()];
input.readBytes(buffer);
gzipOutputStream.write(buffer);
gzipOutputStream.close();

ByteBuf compressed = ByteBufAlloc.byteBuffer(byteArrayOutputStream.size());
compressed.writeBytes(byteArrayOutputStream.toByteArray());
return compressed;
}

public static ByteBuf decompress(ByteBuf input) throws IOException {
byte[] compressedData = new byte[input.readableBytes()];
input.readBytes(compressedData);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedData);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = gzipInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, bytesRead);
}

gzipInputStream.close();
byteArrayOutputStream.close();

byte[] uncompressedData = byteArrayOutputStream.toByteArray();
ByteBuf output = ByteBufAlloc.byteBuffer(uncompressedData.length);
output.writeBytes(uncompressedData);
return output;
}
}
40 changes: 40 additions & 0 deletions automq-shell/src/test/java/com/automq/shell/util/UtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.shell.util;

import com.automq.stream.s3.ByteBufAlloc;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.netty.buffer.ByteBuf;

@Tag("S3Unit")
public class UtilsTest {

@Test
public void testCompression() {
String testStr = "This is a test string";
ByteBuf input = ByteBufAlloc.byteBuffer(testStr.length());
input.writeBytes(testStr.getBytes());
try {
ByteBuf compressed = Utils.compress(input);
ByteBuf decompressed = Utils.decompress(compressed);
String decompressedStr = decompressed.toString(io.netty.util.CharsetUtil.UTF_8);
System.out.printf("Original: %s, Decompressed: %s\n", testStr, decompressedStr);
Assertions.assertEquals(testStr, decompressedStr);
} catch (Exception e) {
Assertions.fail("Exception occurred during compression/decompression: " + e.getMessage());
}
}
}
4 changes: 2 additions & 2 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
@@ -329,9 +329,9 @@ if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
# We need to match to the end of the line to prevent sed from printing the characters that do not match
# JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=32,filesize=32M"
else
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=32M"
fi
fi

8 changes: 8 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -128,6 +128,9 @@ allprojects {

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}

dependencyUpdates {
@@ -2226,7 +2229,11 @@ project(':tools') {
implementation (project(':log4j-appender')){
exclude group: 'org.slf4j', module: '*'
}
// AutoMQ inject start
implementation project(':automq-shell')
implementation libs.kafkaAvroSerializer
implementation libs.bucket4j
// AutoMQ inject end

implementation project(':storage')
implementation project(':connect:runtime')
@@ -2242,6 +2249,7 @@ project(':tools') {
implementation libs.awsSdkAuth
implementation libs.hdrHistogram
implementation libs.spotbugsAnnotations
implementation libs.guava

// for SASL/OAUTHBEARER JWT validation
implementation (libs.jose4j){
Original file line number Diff line number Diff line change
@@ -81,17 +81,17 @@ public class TopicConfig {

public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration as true. " +
"You can not disable this config once it is enabled. It will be provided in future versions.";
"You can not disable this config once it is enabled. It will be provided in future versions.";

public static final String LOCAL_LOG_RETENTION_MS_CONFIG = "local.retention.ms";
public static final String LOCAL_LOG_RETENTION_MS_DOC = "The number of milliseconds to keep the local log segment before it gets deleted. " +
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";
"Default value is -2, it represents `retention.ms` value is to be used. The effective value should always be less than or equal " +
"to `retention.ms` value.";

public static final String LOCAL_LOG_RETENTION_BYTES_CONFIG = "local.retention.bytes";
public static final String LOCAL_LOG_RETENTION_BYTES_DOC = "The maximum size of local log segments that can grow for a partition before it " +
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";
"deletes the old segments. Default value is -2, it represents `retention.bytes` value to be used. The effective value should always be " +
"less than or equal to `retention.bytes` value.";

public static final String REMOTE_LOG_DISABLE_POLICY_RETAIN = "retain";
public static final String REMOTE_LOG_DISABLE_POLICY_DELETE = "delete";
@@ -103,16 +103,16 @@ public class TopicConfig {
"selected then all data in remote will be kept post-disablement and will only be deleted when it breaches expiration " +
"thresholds. If %s is selected then the data will be made inaccessible immediately by advancing the log start offset and will be " +
"deleted asynchronously.", REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE,
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);
REMOTE_LOG_DISABLE_POLICY_RETAIN, REMOTE_LOG_DISABLE_POLICY_DELETE);

public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
"The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";
"If this is increased and there are consumers older than 0.10.2, the consumers' fetch " +
"size must also be increased so that they can fetch record batches this large. " +
"In the latest message format version, records are always grouped into batches for efficiency. " +
"In previous message format versions, uncompressed records are not grouped into batches and this " +
"limit only applies to a single record in that case.";

public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +
@@ -256,4 +256,25 @@ public class TopicConfig {
"broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
"with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
"does not apply to any message format conversion that might be required for replication to followers.";

// AutoMQ inject start
public static final String TABLE_TOPIC_ENABLE_CONFIG = "automq.table.topic.enable";
public static final String TABLE_TOPIC_ENABLE_DOC = "The configuration controls whether enable table topic";
public static final String TABLE_TOPIC_COMMIT_INTERVAL_CONFIG = "automq.table.topic.commit.interval.ms";
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
public static final String TABLE_TOPIC_ID_COLUMNS_CONFIG = "automq.table.topic.id.columns";
public static final String TABLE_TOPIC_ID_COLUMNS_DOC = "The primary key, comma-separated list of columns that identify a row in tables."
+ "ex. [region, name]";
public static final String TABLE_TOPIC_PARTITION_BY_CONFIG = "automq.table.topic.partition.by";
public static final String TABLE_TOPIC_PARTITION_BY_DOC = "The partition fields of the table. ex. [bucket(name), month(timestamp)]";
public static final String TABLE_TOPIC_UPSERT_ENABLE_CONFIG = "automq.table.topic.upsert.enable";
public static final String TABLE_TOPIC_UPSERT_ENABLE_DOC = "The configuration controls whether enable table topic upsert";
public static final String TABLE_TOPIC_CDC_FIELD_CONFIG = "automq.table.topic.cdc.field";
public static final String TABLE_TOPIC_CDC_FIELD_DOC = "The name of the field containing the CDC operation, I, U, or D";
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
@@ -30,7 +30,13 @@ public class Topic {
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state";
public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata";

// AutoMQ inject start
public static final String AUTO_BALANCER_METRICS_TOPIC_NAME = "__auto_balancer_metrics";
public static final String TABLE_TOPIC_CONTROL_TOPIC_NAME = "__automq_table_control";
public static final String TABLE_TOPIC_DATA_TOPIC_NAME = "__automq_table_data";
// AutoMQ inject end

public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition(
CLUSTER_METADATA_TOPIC_NAME,
0
Original file line number Diff line number Diff line change
@@ -129,4 +129,13 @@ public void config(MetricConfig config) {
this.config = config;
}
}

// AutoMQ inject start
/**
* A public method to expose the {@link #measurableValue} method.
*/
public double measurableValueV2(long timeMs) {
return measurableValue(timeMs);
}
// AutoMQ inject end
}
Original file line number Diff line number Diff line change
@@ -23,5 +23,9 @@ public enum ClientQuotaType {
PRODUCE,
FETCH,
REQUEST,
// AutoMQ for Kafka inject start
SLOW_FETCH,
REQUEST_RATE,
// AutoMQ for Kafka inject end
CONTROLLER_MUTATION
}
Loading