NAME
    Data::PubSub::Shared - High-performance shared-memory pub/sub for Linux

SYNOPSIS
        use Data::PubSub::Shared;

        # Publisher
        my $ps = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
        $ps->publish(42);
        $ps->publish_multi(1, 2, 3);

        # Subscriber (same or different process)
        my $ps2 = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
        my $sub  = $ps2->subscribe;      # future messages only
        my $sub2 = $ps2->subscribe_all;  # from oldest available

        # Polling
        my $val = $sub->poll;            # non-blocking, undef if empty
        my @v   = $sub->drain;           # all available
        my $val = $sub->poll_wait(1.5);  # blocking with timeout

        # Callback-based (no per-message method dispatch)
        $sub->poll_cb(sub { process($_[0]) });

        # String variant
        my $sps = Data::PubSub::Shared::Str->new('/tmp/ps.shm', 1024);
        $sps->publish("hello world");

        # Compact variants (half the memory, same API)
        my $ps32 = Data::PubSub::Shared::Int32->new(undef, 65536);
        my $ps16 = Data::PubSub::Shared::Int16->new(undef, 65536);

        # Multiprocess
        if (fork() == 0) {
            my $child = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
            my $sub = $child->subscribe;
            while (defined(my $v = $sub->poll_wait(1))) {
                print "got: $v\n";
            }
            exit;
        }
        $ps->publish(99);
        wait;

DESCRIPTION
    Broadcast pub/sub over shared memory (mmap(MAP_SHARED)). Publishers
    write to a ring buffer; each subscriber independently reads with its own
    cursor. Messages are never consumed -- the ring overwrites old data when
    it wraps. Slow subscribers auto-recover by resetting to the oldest
    available position.

    Linux-only. Requires 64-bit Perl.

  Features
    *   File-backed, anonymous, or memfd-backed mmap

    *   Lock-free MPMC publish for integer variants

    *   Lock-free subscribers for all variants (seqlock)

    *   Variable-length Str messages (circular arena)

    *   Futex-based blocking poll with timeout

    *   PID-based stale lock recovery (Str)

    *   Batch operations: publish_multi, drain, poll_cb, poll_wait_multi

    *   Per-subscriber overflow counting

    *   Keyword API via XS::Parse::Keyword

  Variants
    Data::PubSub::Shared::Int -- int64, 16 bytes/slot
        Lock-free MPMC publish via atomic fetch-and-add. Seqlock-protected
        subscribers. Best for counters, timestamps, event IDs.

    Data::PubSub::Shared::Int32 -- int32, 8 bytes/slot
    Data::PubSub::Shared::Int16 -- int16, 8 bytes/slot
        Compact variants -- half the memory, 2x cache density. Same
        lock-free algorithm. Values silently truncated to type range (C cast
        semantics). Best for status codes, small enums, sensor readings.

    Data::PubSub::Shared::Str -- variable-length strings
        Mutex-protected publish, lock-free subscribers. Messages stored in a
        circular arena (max capped at "msg_size", default 256 bytes). UTF-8
        flag preserved. Best for log lines, JSON, serialized payloads.

  Int vs Str
    Int (including Int32/Int16): lock-free, zero contention between
    publishers. Use when the payload fits in an integer.

    Str: mutex serializes publishers, but subscribers are still lock-free.
    Use for arbitrary byte strings.

API
  Constructor
        my $ps = Data::PubSub::Shared::Int->new($path, $capacity);
        my $ps = Data::PubSub::Shared::Str->new($path, $capacity);
        my $ps = Data::PubSub::Shared::Str->new($path, $capacity, $msg_size);

    $capacity is rounded up to the next power of 2. When opening an existing
    file, parameters are read from the stored header. Pass "undef" for $path
    for anonymous (fork-inherited) pub/sub.

    Replace "Int" with "Int32", "Int16", or "Str" as needed.

  memfd
        my $ps = Data::PubSub::Shared::Int->new_memfd($name, $capacity);
        my $fd = $ps->memfd;
        my $ps2 = Data::PubSub::Shared::Int->new_from_fd($fd);

    No filesystem path -- backed by memfd_create(2). Share via fork()
    inheritance or "SCM_RIGHTS" fd passing. The fd is dup'd internally by
    "new_from_fd".

  Publishing
        $ps->publish($value);                # always succeeds
        my $n = $ps->publish_multi(@values); # batch (max 8192 values)
        $ps->publish_notify($value);         # publish + eventfd notify

    Int: "publish_multi" claims all slots in one atomic fetch-add, then
    writes values and wakes subscribers once. Str: holds mutex for the
    entire batch.

  Subscribing
        my $sub = $ps->subscribe;       # future messages only
        my $sub = $ps->subscribe_all;   # from oldest available

    Subscribers are process-local. Each process creates its own.

  Polling
        my $val = $sub->poll;                       # non-blocking
        my @v   = $sub->poll_multi($n);             # up to $n
        my @v   = $sub->drain;                      # all available
        my @v   = $sub->drain($max);                # up to $max
        my $val = $sub->poll_wait;                  # block forever
        my $val = $sub->poll_wait($timeout);        # block with timeout
        my @v   = $sub->poll_wait_multi($n, $timeout);  # block for >=1

  Callback Polling
        my $n = $sub->poll_cb(\&handler);

    Calls handler($msg) for each available message without returning to Perl
    between messages. Returns count processed.

  Event Loop Integration
        my $fd = $ps->eventfd;           # create eventfd
        $ps->notify;                     # signal after publish
        $ps->eventfd_consume;            # drain notification counter

        # Combined: consume eventfd + drain messages
        my @v = $sub->drain_notify;
        my @v = $sub->drain_notify($max);

        # EV example
        my $w = EV::io $fd, EV::READ, sub {
            my @msgs = $sub->drain_notify;
            process($_) for @msgs;
        };

    Subscribers inherit the handle's eventfd at creation time. Use
    "$sub->eventfd_set($fd)" to set manually after creation.

  Status
        my $n = $sub->lag;             # messages behind
        my $n = $sub->overflow_count;  # total messages lost to overflow
        my $o = $sub->has_overflow;    # true if currently overflowed
        my $c = $sub->cursor;         # read position
        $sub->cursor($pos);           # seek
        my $p = $sub->write_pos;      # publisher position

  Cursor Management
        $sub->reset;         # jump to latest (future messages only)
        $sub->reset_oldest;  # jump to oldest available

    If a subscriber falls behind by more than "capacity" messages, "poll"
    auto-recovers by resetting to the oldest available position. Lost
    messages are counted in "overflow_count".

  Handle Management
        $ps->clear;              # reset ring to initial state
        $ps->sync;               # msync to disk
        $ps->unlink;             # remove backing file
        Class->unlink($path);    # class method form
        my $p = $ps->path;       # undef for anonymous/memfd
        my $s = $ps->stats;      # diagnostic hashref

  Keyword API
        use Data::PubSub::Shared::Int;

        ps_int_publish $ps, $value;
        my $val = ps_int_poll $sub;
        my $n   = ps_int_lag $sub;

    Replace "int" with "int32", "int16", or "str". Keywords are lexically
    scoped.

  Crash Safety
    Str mode: futex mutex with PID tracking. If a publisher dies holding the
    mutex, other publishers recover within 2 seconds. Int modes are
    lock-free and need no recovery.

BENCHMARKS
    Single-process, 1M items, Linux x86_64. Run "perl -Mblib
    bench/throughput.pl" to reproduce.

        PUBLISH + POLL (interleaved)
        Int     5.0M/s   (16 bytes/slot)
        Int32   5.9M/s   (8 bytes/slot)
        Int16   5.7M/s   (8 bytes/slot)
        Str     2.5M/s   (~30B messages)

        BATCH (100/batch)
        Int publish_multi:    170M/s
        Str publish_multi:     42M/s

    Fan-out: publish throughput is independent of subscriber count.

SEE ALSO
    Data::Buffer::Shared - typed shared array

    Data::HashMap::Shared - concurrent hash table

    Data::Queue::Shared - FIFO queue

    Data::ReqRep::Shared - request-reply

    Data::Sync::Shared - synchronization primitives

    Data::Pool::Shared - fixed-size object pool

    Data::Stack::Shared - LIFO stack

    Data::Deque::Shared - double-ended queue

    Data::Log::Shared - append-only log (WAL)

    Data::Heap::Shared - priority queue

    Data::Graph::Shared - directed weighted graph

    Data::BitSet::Shared - shared bitset (lock-free per-bit ops)

    Data::RingBuffer::Shared - fixed-size overwriting ring buffer

AUTHOR
    vividsnow

LICENSE
    This is free software; you can redistribute it and/or modify it under
    the same terms as Perl itself.

