NAME
    EV::Kafka - High-performance asynchronous Kafka/Redpanda client using EV

SYNOPSIS
        use EV::Kafka;

        my $kafka = EV::Kafka->new(
            brokers  => '127.0.0.1:9092',
            acks     => -1,
            on_error => sub { warn "kafka: @_" },
            on_message => sub {
                my ($topic, $partition, $offset, $key, $value, $headers) = @_;
                print "$topic:$partition @ $offset  $key = $value\n";
            },
        );

        # Producer
        $kafka->connect(sub {
            $kafka->produce('my-topic', 'key', 'value', sub {
                my ($result, $err) = @_;
                say "produced at offset " . $result->{topics}[0]{partitions}[0]{base_offset};
            });
        });

        # Consumer (manual assignment)
        $kafka->assign([{ topic => 'my-topic', partition => 0, offset => 0 }]);
        my $poll = EV::timer 0, 0.1, sub { $kafka->poll };

        # Consumer group
        $kafka->subscribe('my-topic',
            group_id  => 'my-group',
            on_assign => sub { ... },
            on_revoke => sub { ... },
        );

        EV::run;

DESCRIPTION
    EV::Kafka is a high-performance asynchronous Kafka client that
    implements the Kafka binary protocol in XS with EV event loop
    integration. It targets Redpanda and Apache Kafka (protocol version
    0.11+).

    Two-layer architecture:

    *   EV::Kafka::Conn (XS) -- single broker TCP connection with protocol
        encoding/decoding, correlation ID matching, pipelining, optional TLS
        and SASL/PLAIN authentication.

    *   EV::Kafka::Client (Perl) -- cluster management with metadata
        discovery, broker connection pooling, partition leader routing,
        producer with key-based partitioning, consumer with manual
        assignment or consumer groups.

    Features:

    *   Binary protocol implemented in pure XS (no librdkafka dependency)

    *   Automatic request pipelining per broker connection

    *   Metadata-driven partition leader routing

    *   Producer: acks modes (-1/0/1), key-based partitioning (murmur2),
        headers, fire-and-forget (acks=0)

    *   Consumer: manual partition assignment, offset tracking, poll-based
        message delivery

    *   Consumer groups: JoinGroup/SyncGroup/Heartbeat, sticky partition
        assignment, offset commit/fetch, automatic rebalancing

    *   TLS (OpenSSL) and SASL/PLAIN authentication

    *   Automatic reconnection at the connection layer

    *   Bootstrap broker failover (tries all listed brokers)

ANYEVENT INTEGRATION
    AnyEvent has EV as one of its backends, so EV::Kafka can be used in
    AnyEvent applications seamlessly.

NO UTF-8 SUPPORT
    This module handles all values as bytes. Encode your UTF-8 strings
    before passing them:

        use Encode;

        $kafka->produce($topic, $key, encode_utf8($val), sub { ... });

CLUSTER CLIENT METHODS
  new(%options)
    Create a new EV::Kafka client. Returns a blessed "EV::Kafka::Client"
    object.

        my $kafka = EV::Kafka->new(
            brokers  => '10.0.0.1:9092,10.0.0.2:9092',
            acks     => -1,
            on_error => sub { warn @_ },
        );

    Options:

    brokers => 'Str'
        Comma-separated list of bootstrap broker addresses (host:port).
        Default: "127.0.0.1:9092".

    client_id => 'Str' (default 'ev-kafka')
        Client identifier sent to brokers.

    tls => Bool
        Enable TLS encryption.

    tls_ca_file => 'Str'
        Path to CA certificate file for TLS verification.

    tls_skip_verify => Bool
        Skip TLS certificate verification.

    sasl => \%opts
        Enable SASL authentication. Supports PLAIN mechanism:

            sasl => { mechanism => 'PLAIN', username => 'user', password => 'pass' }

    acks => Int (default -1)
        Producer acknowledgment mode. -1 = all in-sync replicas, 0 = no
        acknowledgment (fire-and-forget), 1 = leader only.

    linger_ms => Int (default 5)
        Time in milliseconds to accumulate records before flushing a batch.
        Lower values reduce latency; higher values improve throughput.

    batch_size => Int (default 16384)
        Maximum batch size in bytes before a batch is flushed immediately.

    compression => 'Str'
        Compression type for produce batches: 'lz4' (requires liblz4),
        'gzip' (requires zlib), or "undef" for none.

    idempotent => Bool (default 0)
        Enable idempotent producer. Calls "InitProducerId" on connect and
        sets producer_id/epoch/sequence in each RecordBatch for exactly-once
        delivery (broker-side deduplication).

    transactional_id => 'Str'
        Enable transactional producer. Implies idempotent. Required for
        "begin_transaction"/"commit_transaction"/"abort_transaction" and
        "send_offsets_to_transaction" (full EOS).

    partitioner => $cb->($topic, $key, $num_partitions)
        Custom partition selection function. Default: murmur2 hash of key,
        or round-robin for null keys.

    on_error => $cb->($errstr)
        Error callback. Default: "die".

    on_connect => $cb->()
        Called once after initial metadata fetch completes.

    on_message => $cb->($topic, $partition, $offset, $key, $value, $headers)
        Message delivery callback for consumer operations.

    fetch_max_wait_ms => Int (default 500)
        Maximum time the broker waits for "fetch_min_bytes" of data.

    fetch_max_bytes => Int (default 1048576)
        Maximum bytes per fetch response.

    fetch_min_bytes => Int (default 1)
        Minimum bytes before the broker responds to a fetch.

    metadata_refresh => Int (default 300)
        Metadata refresh interval in seconds (reserved, not yet wired).

    loop => EV::Loop
        EV loop to use. Default: "EV::default_loop".

  connect($cb)
    Connect to the cluster. Connects to the first available bootstrap
    broker, fetches cluster metadata, then fires "$cb-"($metadata)>.

        $kafka->connect(sub {
            my $meta = shift;
            # $meta->{brokers}, $meta->{topics}
        });

  produce($topic, $key, $value, [\%opts,] [$cb])
    Produce a message. Routes to the correct partition leader automatically.

        # with callback (acks=1 or acks=-1)
        $kafka->produce('topic', 'key', 'value', sub {
            my ($result, $err) = @_;
        });

        # with headers
        $kafka->produce('topic', 'key', 'value',
            { headers => { 'h1' => 'v1' } }, sub { ... });

        # fire-and-forget (acks=0)
        $kafka->produce('topic', 'key', 'value');

        # explicit partition
        $kafka->produce('topic', 'key', 'value',
            { partition => 3 }, sub { ... });

  produce_many(\@messages, $cb)
    Produce multiple messages with a single completion callback. Each
    message is an arrayref "[$topic, $key, $value]" or a hashref "{topic,
    key, value}". $cb fires when all messages are acknowledged.

        $kafka->produce_many([
            ['my-topic', 'k1', 'v1'],
            ['my-topic', 'k2', 'v2'],
        ], sub {
            my $errors = shift;
            warn "some failed: @$errors" if $errors;
        });

  flush([$cb])
    Flush all accumulated produce batches and wait for all in-flight
    requests to complete. $cb fires when all pending responses have been
    received.

  assign(\@partitions)
    Manually assign partitions for consuming.

        $kafka->assign([
            { topic => 'my-topic', partition => 0, offset => 0 },
            { topic => 'my-topic', partition => 1, offset => 100 },
        ]);

  seek($topic, $partition, $offset, [$cb])
    Seek a partition to a specific offset. Use -2 for earliest, -1 for
    latest. Updates the assignment in-place.

        $kafka->seek('my-topic', 0, -1, sub { print "at latest\n" });

  offsets_for($topic, $cb)
    Get earliest and latest offsets for all partitions of a topic.

        $kafka->offsets_for('my-topic', sub {
            my $offsets = shift;
            # { 0 => { earliest => 0, latest => 42 }, 1 => ... }
        });

  lag($cb)
    Get consumer lag for all assigned partitions.

        $kafka->lag(sub {
            my $lag = shift;
            # { "topic:0" => { current => 10, latest => 42, lag => 32 } }
        });

  error_name($code)
    Convert a Kafka numeric error code to its name.

        EV::Kafka::Client::error_name(3)  # "UNKNOWN_TOPIC_OR_PARTITION"

  poll([$cb])
    Fetch messages from assigned partitions. Calls "on_message" for each
    received record. $cb fires when all fetch responses have arrived.

        my $timer = EV::timer 0, 0.1, sub { $kafka->poll };

  subscribe($topic, ..., %opts)
    Join a consumer group and subscribe to topics. The group protocol
    handles partition assignment automatically.

        $kafka->subscribe('topic-a', 'topic-b',
            group_id           => 'my-group',
            session_timeout    => 30000,      # ms
            rebalance_timeout  => 60000,      # ms
            heartbeat_interval => 3,          # seconds
            auto_commit        => 1,          # commit on unsubscribe (default)
            auto_offset_reset  => 'earliest', # or 'latest'
            group_instance_id  => 'pod-abc', # KIP-345 static membership
            on_assign => sub {
                my $partitions = shift;
                # [{topic, partition, offset}, ...]
            },
            on_revoke => sub {
                my $partitions = shift;
            },
        );

  commit([$cb])
    Commit current consumer offsets to the group coordinator.

        $kafka->commit(sub {
            my $err = shift;
            warn "commit failed: $err" if $err;
        });

  unsubscribe([$cb])
    Leave the consumer group (sends LeaveGroup for fast rebalance), stop
    heartbeat and fetch loop. If "auto_commit" is enabled, commits offsets
    before leaving.

  begin_transaction
    Start a transaction. Requires "transactional_id" in constructor.

  send_offsets_to_transaction($group_id, [$cb])
    Commit consumer offsets within the current transaction via
    "TxnOffsetCommit". This is the key step for exactly-once
    consume-process-produce pipelines.

        $kafka->send_offsets_to_transaction('my-group', sub {
            my ($result, $err) = @_;
        });

  commit_transaction([$cb])
    Commit the current transaction. All produced messages and offset commits
    within the transaction become visible atomically.

  abort_transaction([$cb])
    Abort the current transaction. All produced messages are discarded and
    offset commits are rolled back.

  close([$cb])
    Graceful shutdown: stop timers, disconnect all broker connections.

        $kafka->close(sub { EV::break });

LOW-LEVEL CONNECTION METHODS
    "EV::Kafka::Conn" provides direct access to a single broker connection.
    Useful for custom protocols, debugging, or when cluster-level routing is
    not needed.

        my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
        $conn->on_error(sub { warn @_ });
        $conn->on_connect(sub { ... });
        $conn->connect('127.0.0.1', 9092, 5.0);

  connect($host, $port, [$timeout])
    Connect to a broker. Timeout in seconds (0 = no timeout).

  disconnect
    Disconnect from broker.

  connected
    Returns true if the connection is ready (ApiVersions handshake
    complete).

  metadata(\@topics, $cb)
    Request cluster metadata. Pass "undef" for all topics.

        $conn->metadata(['my-topic'], sub {
            my ($result, $err) = @_;
            # $result->{brokers}, $result->{topics}
        });

  produce($topic, $partition, $key, $value, [\%opts,] [$cb])
    Produce a message to a specific partition.

        $conn->produce('topic', 0, 'key', 'value', sub {
            my ($result, $err) = @_;
        });

    Options: "acks" (default 1), "headers" (hashref), "timestamp" (epoch ms,
    default now), "compression" ('none', 'lz4'; requires LZ4 at build time).

  produce_batch($topic, $partition, \@records, [\%opts,] [$cb])
    Produce multiple records in a single RecordBatch. Each record is "{key,
    value, headers}". Options: "acks", "compression", "producer_id",
    "producer_epoch", "base_sequence".

        $conn->produce_batch('topic', 0, [
            { key => 'k1', value => 'v1' },
            { key => 'k2', value => 'v2' },
        ], sub { my ($result, $err) = @_ });

  fetch($topic, $partition, $offset, $cb, [$max_bytes])
    Fetch messages from a partition starting at $offset.

        $conn->fetch('topic', 0, 0, sub {
            my ($result, $err) = @_;
            for my $rec (@{ $result->{topics}[0]{partitions}[0]{records} }) {
                printf "offset=%d key=%s value=%s\n",
                    $rec->{offset}, $rec->{key}, $rec->{value};
            }
        });

  fetch_multi(\%topics, $cb, [$max_bytes])
    Multi-partition fetch in a single request. Groups multiple
    topic-partitions into one Fetch call to the broker.

        $conn->fetch_multi({
            'topic-a' => [{ partition => 0, offset => 10 },
                           { partition => 1, offset => 20 }],
            'topic-b' => [{ partition => 0, offset => 0 }],
        }, sub { my ($result, $err) = @_ });

    Used internally by poll() to batch fetches by broker leader.

  list_offsets($topic, $partition, $timestamp, $cb)
    Get offsets by timestamp. Use -2 for earliest, -1 for latest.

  find_coordinator($key, $cb, [$key_type])
    Find the coordinator broker. $key_type: 0=group (default),
    1=transaction.

  join_group($group_id, $member_id, \@topics, $cb, [$session_timeout_ms, $rebalance_timeout_ms, $group_instance_id])
    Join a consumer group. Pass $group_instance_id for KIP-345 static
    membership.

  sync_group($group_id, $generation_id, $member_id, \@assignments, $cb, [$group_instance_id])
    Synchronize group state after join.

  heartbeat($group_id, $generation_id, $member_id, $cb, [$group_instance_id])
    Send heartbeat to group coordinator.

  offset_commit($group_id, $generation_id, $member_id, \@offsets, $cb)
    Commit consumer offsets.

  offset_fetch($group_id, \@topics, $cb)
    Fetch committed offsets for a consumer group.

  api_versions
    Returns a hashref of supported API keys to max versions, or undef if not
    yet negotiated.

        my $vers = $conn->api_versions;
        # { 0 => 7, 1 => 11, 3 => 8, ... }

  on_error([$cb])
  on_connect([$cb])
  on_disconnect([$cb])
    Set handler callbacks. Pass "undef" to clear.

  client_id($id)
    Set the client identifier.

  tls($enable, [$ca_file, $skip_verify])
    Configure TLS.

  sasl($mechanism, [$username, $password])
    Configure SASL authentication.

  auto_reconnect($enable, [$delay_ms])
    Enable automatic reconnection with delay in milliseconds (default 1000).

  leave_group($group_id, $member_id, $cb)
    Send LeaveGroup to coordinator for fast partition rebalance.

  create_topics(\@topics, $timeout_ms, $cb)
    Create topics. Each element: "{name, num_partitions,
    replication_factor}".

        $conn->create_topics(
            [{ name => 'new-topic', num_partitions => 3, replication_factor => 1 }],
            5000, sub { my ($res, $err) = @_ }
        );

  delete_topics(\@topic_names, $timeout_ms, $cb)
    Delete topics by name.

  init_producer_id($transactional_id, $txn_timeout_ms, $cb)
    Initialize a producer ID for idempotent/transactional produce. Pass
    "undef" for non-transactional idempotent producer.

  add_partitions_to_txn($txn_id, $producer_id, $epoch, \@topics, $cb)
    Register partitions with the transaction coordinator.

  end_txn($txn_id, $producer_id, $epoch, $committed, $cb)
    Commit ("$committed=1") or abort ("$committed=0") a transaction.

  txn_offset_commit($txn_id, $group_id, $producer_id, $epoch, $generation, $member_id, \@offsets, $cb)
    Commit consumer offsets within a transaction (API 28).

  pending
    Number of requests awaiting broker response.

  state
    Connection state as integer (0=disconnected, 6=ready).

UTILITY FUNCTIONS
  EV::Kafka::_murmur2($key)
    Kafka-compatible murmur2 hash. Returns a non-negative 31-bit integer.

  EV::Kafka::_crc32c($data)
    CRC32C checksum (Castagnoli). Used internally for RecordBatch integrity.

  EV::Kafka::_error_name($code)
    Convert Kafka error code to string name.

RESULT STRUCTURES
  Produce result
        $result = {
            topics => [{
                topic      => 'name',
                partitions => [{
                    partition   => 0,
                    error_code  => 0,
                    base_offset => 42,
                }],
            }],
        };

  Fetch result
        $result = {
            topics => [{
                topic      => 'name',
                partitions => [{
                    partition      => 0,
                    error_code     => 0,
                    high_watermark => 100,
                    records => [{
                        offset    => 42,
                        timestamp => 1712345678000,
                        key       => 'key',      # or undef
                        value     => 'value',     # or undef
                        headers   => { h => 'v' },  # if present
                    }],
                }],
            }],
        };

  Metadata result
        $result = {
            controller_id => 0,
            brokers => [{ node_id => 0, host => '10.0.0.1', port => 9092 }],
            topics  => [{
                name       => 'topic',
                error_code => 0,
                partitions => [{
                    partition  => 0,
                    leader     => 0,
                    error_code => 0,
                }],
            }],
        };

ERROR HANDLING
    Errors are delivered through two channels:

    Connection-level errors fire the "on_error" callback (or "croak" if none
    set). These include connection refused, DNS failure, TLS errors, SASL
    auth failure, and protocol violations.
    Request-level errors are delivered as the second argument to the request
    callback: "$cb->($result, $error)". If $error is defined, $result may be
    undef.

    Within result structures, per-partition "error_code" fields use Kafka
    numeric codes:

        0   No error
        1   OFFSET_OUT_OF_RANGE
        3   UNKNOWN_TOPIC_OR_PARTITION
        6   NOT_LEADER_OR_FOLLOWER
        15  COORDINATOR_NOT_AVAILABLE
        16  NOT_COORDINATOR
        25  UNKNOWN_MEMBER_ID
        27  REBALANCE_IN_PROGRESS
        36  TOPIC_ALREADY_EXISTS
        79  MEMBER_ID_REQUIRED

    When a broker disconnects mid-flight, all pending callbacks receive
    "(undef, "connection closed by broker")" or "(undef, "disconnected")".

ENVIRONMENT VARIABLES
    These are used by tests and examples (not by the module itself):

        TEST_KAFKA_BROKER    broker address for tests (host:port)
        KAFKA_BROKER         broker address for examples
        KAFKA_HOST           broker hostname for low-level examples
        KAFKA_PORT           broker port for low-level examples
        KAFKA_TOPIC          topic name for examples
        KAFKA_GROUP_ID       consumer group for examples
        KAFKA_LIMIT          message limit for consume example
        KAFKA_COUNT          message count for fire-and-forget
        BENCH_BROKER         broker for benchmarks
        BENCH_MESSAGES       message count for benchmarks
        BENCH_VALUE_SIZE     value size in bytes for benchmarks
        BENCH_TOPIC          topic name for benchmarks

QUICK START
    Minimal producer + consumer lifecycle:

        use EV;
        use EV::Kafka;

        my $kafka = EV::Kafka->new(
            brokers    => '127.0.0.1:9092',
            acks       => 1,
            on_error   => sub { warn "kafka: @_\n" },
            on_message => sub {
                my ($topic, $part, $offset, $key, $value) = @_;
                print "got: $key=$value\n";
            },
        );

        $kafka->connect(sub {
            # produce
            $kafka->produce('test', 'k1', 'hello', sub {
                print "produced\n";

                # consume from the beginning
                $kafka->assign([{topic=>'test', partition=>0, offset=>0}]);
                $kafka->seek('test', 0, -2, sub {
                    my $t = EV::timer 0, 0.1, sub { $kafka->poll };
                    $kafka->{cfg}{_t} = $t;
                });
            });
        });

        EV::run;

COOKBOOK
  Produce JSON with headers
        use JSON::PP;
        my $json = JSON::PP->new->utf8;

        $kafka->produce('events', 'user-42',
            $json->encode({ action => 'click', page => '/home' }),
            { headers => { 'content-type' => 'application/json' } },
            sub { ... }
        );

  Consume from latest offset only
        $kafka->subscribe('live-feed',
            group_id          => 'realtime',
            auto_offset_reset => 'latest',
            on_assign         => sub { print "ready\n" },
        );

  Graceful shutdown
        $SIG{INT} = sub {
            $kafka->commit(sub {
                $kafka->unsubscribe(sub {
                    $kafka->close(sub { EV::break });
                });
            });
        };

  At-least-once processing
        $kafka->subscribe('jobs',
            group_id    => 'workers',
            auto_commit => 0,
        );

        # in on_message: process, then commit
        on_message => sub {
            process($_[4]);
            $kafka->commit if ++$count % 100 == 0;
        },

  Batch produce
        $kafka->produce_many([
            ['events', 'k1', 'v1'],
            ['events', 'k2', 'v2'],
            ['events', 'k3', 'v3'],
        ], sub {
            my $errs = shift;
            print $errs ? "some failed\n" : "all done\n";
        });

  Exactly-once stream processing (EOS)
        my $kafka = EV::Kafka->new(
            brokers          => '...',
            transactional_id => 'my-eos-app',
            acks             => -1,
            on_message => sub {
                my ($t, $p, $off, $key, $value) = @_;
                my $result = process($value);
                $kafka->produce('output-topic', $key, $result);
            },
        );

        # consume-process-produce loop:
        $kafka->begin_transaction;
        $kafka->poll(sub {
            $kafka->send_offsets_to_transaction('my-group', sub {
                $kafka->commit_transaction(sub {
                    $kafka->begin_transaction;  # next transaction
                });
            });
        });

  Topic administration
        my $conn = EV::Kafka::Conn::_new('EV::Kafka::Conn', undef);
        $conn->on_connect(sub {
            $conn->create_topics(
                [{ name => 'new-topic', num_partitions => 6, replication_factor => 3 }],
                10000, sub { ... }
            );
        });

BENCHMARKS
    Measured on Linux with TCP loopback to Redpanda, 100-byte values, Perl
    5.40.2, 50K messages ("bench/benchmark.pl"):

        Pipeline produce (acks=1)    68K msg/sec     7.4 MB/s
        Fire-and-forget (acks=0)    100K msg/sec    11.0 MB/s
        Fetch throughput             31K msg/sec     3.4 MB/s
        Sequential round-trip        19K msg/sec    54 us avg latency
        Metadata request             25K req/sec    41 us avg latency

    Throughput by value size (pipelined, acks=1):

           10 bytes    61K msg/sec      0.9 MB/s
          100 bytes    68K msg/sec      7.4 MB/s
         1000 bytes    50K msg/sec     50.2 MB/s
        10000 bytes    18K msg/sec    178.5 MB/s

    Pipeline produce throughput is limited by Perl callback overhead per
    message. Fire-and-forget mode ("acks=0") skips the response cycle
    entirely, reaching ~100K msg/sec. Sequential round-trip (one produce,
    wait for ack, repeat) measures raw broker latency at ~54 microseconds.

    The fetch path is sequential (fetch, process, fetch again) which
    introduces one round-trip per batch. With larger "max_bytes" and dense
    topics, fetch throughput increases proportionally.

    Run "perl bench/benchmark.pl" for throughput results. Set
    "BENCH_BROKER", "BENCH_MESSAGES", "BENCH_VALUE_SIZE", and "BENCH_TOPIC"
    to customize.

    Run "perl bench/latency.pl" for a latency histogram with percentiles
    (min, avg, median, p90, p95, p99, max).

KAFKA PROTOCOL
    This module implements the Kafka binary protocol directly in XS. All
    integers are big-endian. Requests use a 4-byte size prefix followed by a
    header (API key, version, correlation ID, client ID) and a
    version-specific body.

    Responses are matched to requests by correlation ID. The broker
    guarantees FIFO ordering per connection, so the response queue is a
    simple FIFO.

    RecordBatch encoding (magic=2) is used for produce. CRC32C covers the
    batch from attributes through the last record. Records use
    ZigZag-encoded varints for lengths and deltas.

    The connection handshake sends ApiVersions (v0) on connect to discover
    supported protocol versions. SASL authentication uses SaslHandshake (v1)
    + SaslAuthenticate (v2) with PLAIN mechanism.

    Consumer group protocol uses sticky partition assignment with
    MEMBER_ID_REQUIRED (error 79) retry per KIP-394.

    Non-flexible API versions are used throughout (capped below the
    flexible-version threshold for each API) to avoid the compact encoding
    complexity.

LIMITATIONS
    *   LZ4 and gzip compression -- supported when built with liblz4 and
        zlib. snappy and zstd are not implemented.

    *   Transactions / EOS -- "begin_transaction",
        "send_offsets_to_transaction", "commit_transaction",
        "abort_transaction" provide full exactly-once stream processing.
        "InitProducerId", "AddPartitionsToTxn", "TxnOffsetCommit", "EndTxn"
        are all wired. Requires "transactional_id" in constructor.

    *   No GSSAPI/OAUTHBEARER -- SASL/PLAIN and SCRAM-SHA-256/512 are
        supported. GSSAPI (Kerberos) and OAUTHBEARER are not implemented.

    *   Sticky partition assignment -- assignments are preserved across
        rebalances where possible. New partitions are distributed to the
        least-loaded member. Overloaded members shed excess partitions.

    *   Blocking DNS resolution -- "getaddrinfo" is called synchronously in
        "conn_start_connect". For fully non-blocking operation, use IP
        addresses instead of hostnames.

    *   No flexible API versions -- all API versions are capped below the
        flexible-version threshold to avoid compact string/array encoding.
        This limits interoperability with very new protocol features but
        works with all Kafka 0.11+ and Redpanda brokers.

    *   Limited produce retry -- transient errors (NOT_LEADER,
        COORDINATOR_NOT_AVAILABLE) trigger metadata refresh and up to 3
        retries with backoff. Non-retriable errors are surfaced to the
        callback immediately.

AUTHOR
    vividsnow

LICENSE
    This library is free software; you can redistribute it and/or modify it
    under the same terms as Perl itself.

