Stream Event Log Blueprint
Append-only event log with monotonically increasing IDs, consumer groups for distributed processing, and automatic acknowledgment tracking
| Feature | stream-event-log |
| Category | Data |
| Version | 1.0.0 |
| Tags | streams, event-log, consumer-groups, message-queue, ack-tracking, ordering |
| YAML Source | View on GitHub |
| JSON API | stream-event-log.json |
Actors
| ID | Name | Type | Description |
|---|---|---|---|
producer | Producer | system | Application adding events to stream |
consumer | Consumer | system | Application reading events from stream |
consumer_group | Consumer Group | system | Named group tracking consumer progress and pending messages |
Fields
| Name | Type | Required | Label | Description |
|---|---|---|---|---|
key | text | Yes | Key | |
entry_id | text | No | Entry Id | |
fields | json | No | Fields | |
group_name | text | No | Group Name | |
consumer_name | text | No | Consumer Name | |
pending_entries | json | No | Pending Entries |
States
State field: message_delivery
Values:
| State | Initial | Terminal |
|---|---|---|
undelivered | Yes | |
pending | ||
acknowledged | Yes |
Rules
- general: Stream entry IDs are globally ordered; new IDs always > previous IDs, Entry IDs auto-generated based on millisecond timestamp and sequence counter, Consumer groups track position with last_id (messages after this are new), Consumer groups maintain Pending Entry List (PEL) of unacknowledged messages, Messages in PEL tracked by both group and consumer (dual indexing), Idle messages in PEL can be claimed by other consumers, All stream operations are atomic with respect to the stream key, Entry deletion leaves tombstone (space not reclaimed)
Outcomes
Xadd_entry (Priority: 10)
Add new entry with auto-generated or explicit ID
Given:
-
XADD key [ID *] field value [field value …] id_generation(input) exists
Then:
- set_field target:
entry_id— create monotonic ID - set_field target:
fields— store event data - emit_event event:
stream.entry_added
Result: new entry ID returned to producer
Xadd_with_trimming (Priority: 11)
Add entry and trim stream to max length/id
Given:
-
XADD with MAXLEN MINID flag trim_strategy(input) exists
Then:
- set_field target:
entry_id - set_field target:
fields - emit_event event:
stream.trimmed
Result: new entry ID; stream trimmed per strategy
Xadd_idempotent (Priority: 12)
Add with idempotent deduplication (IDMP)
Given:
- XADD with IDMP
duplicate(db) eqtrue
Then:
- emit_event event:
stream.duplicate_detected
Result: existing entry ID returned (no new entry added)
Xread_entries (Priority: 20)
Read entries starting after given ID
Given:
- XREAD [COUNT count] STREAMS key id
start_id(input) exists
Then:
- emit_event event:
stream.read
Result: array of entries [id, [field1, value1, …]] or nil if empty
Xread_range (Priority: 21)
Get entries by ID range
Given:
command(input) inXRANGE,XREVRANGEstart_id(input) existsend_id(input) exists
Then:
- emit_event event:
stream.range_read
Result: array of entries in range (XREVRANGE returns reverse order)
Xread_blocking (Priority: 22)
Block until new entries arrive
Given:
- XREAD BLOCK timeout_ms … STREAMS key id
timeout_ms(input) existsnew_entries_available(system) eqfalse
Then:
- transition_state field:
message_deliveryto:suspended - emit_event event:
stream.blocking_read
Result: client blocks until new entries or timeout; returns entries or nil
Xlen_count (Priority: 23)
Get stream length (non-deleted entries)
Given:
- XLEN key
Then:
- emit_event event:
stream.length_read
Result: number of non-deleted entries
Xgroup_create (Priority: 30)
Create consumer group
Given:
- XGROUP CREATE key group id
id(input) exists
Then:
- set_field target:
group_name - transition_state field:
consumer_group_stateto:new - emit_event event:
stream.group_created
Result: OK returned; group created and ready
Xgroup_destroy (Priority: 31)
Delete consumer group
Given:
- XGROUP DESTROY key group
Then:
- emit_event event:
stream.group_deleted
Result: OK returned; group and its PEL deleted
Xgroup_setid (Priority: 32)
Update group read position
Given:
- XGROUP SETID key group id
Then:
- emit_event event:
stream.group_position_updated
Result: OK returned; future XREADGROUP starts at new position
Xgroup_createconsumer (Priority: 33)
Explicitly create consumer in group
Given:
- XGROUP CREATECONSUMER key group consumer
Then:
- emit_event event:
stream.consumer_created
Result: 1 if new consumer created, 0 if already existed
Xgroup_delconsumer (Priority: 34)
Remove consumer from group
Given:
- XGROUP DELCONSUMER key group consumer
Then:
- emit_event event:
stream.consumer_deleted
Result: count of pending entries that were removed
Xreadgroup_entries (Priority: 35)
Read as consumer group member
Given:
- XREADGROUP GROUP group consumer STREAMS key id
id(input) existsmessages_available(db) eqtrue
Then:
- set_field target:
pending_entries— create NACK for each delivered message - transition_state field:
message_deliveryto:pending - emit_event event:
stream.group_read
Result: array of entries with auto-added to consumer’s PEL
Xreadgroup_blocking (Priority: 36)
Block within consumer group
Given:
- XREADGROUP BLOCK timeout_ms …
new_messages(db) eqfalse
Then:
- transition_state field:
message_deliveryto:suspended - emit_event event:
stream.group_blocking_read
Result: client blocks; returns entries or nil on timeout
Xack_messages (Priority: 40)
Acknowledge messages as processed
Given:
- XACK key group id [id …]
ids_in_pel(db) exists
Then:
- set_field target:
pending_entries— remove from group PEL and consumer PEL - transition_state field:
message_deliveryto:acknowledged - emit_event event:
stream.acked
Result: count of acknowledged messages (0 if already acked or not found)
Xpending_summary (Priority: 41)
Get pending message summary
Given:
- XPENDING key group
Then:
- emit_event event:
stream.pending_summary
Result: [total_pending, first_pending_id, last_pending_id, [[consumer, count], …]]
Xpending_details (Priority: 42)
Get detailed pending message info
Given:
- XPENDING key group [IDLE min_idle] start end count
idle_filter(input) exists
Then:
- emit_event event:
stream.pending_details
Result: array of [id, consumer, idle_ms, delivery_count]
Xclaim_messages (Priority: 43)
Claim idle messages from other consumer
Given:
- XCLAIM key group new_consumer min_idle_ms id [id …] [IDLE ms] [RETRYCOUNT count]
message_idle(db) gtemin_idle_ms
Then:
- set_field target:
pending_entries— transfer from old consumer to new consumer - emit_event event:
stream.claimed
Result: array of claimed messages [id, [field1, value1, …]] or empty if none eligible
Xautoclaim_messages (Priority: 44)
Auto-claim idle messages with pagination
Given:
- XAUTOCLAIM key group consumer min_idle_ms start_id [COUNT count]
Then:
- emit_event event:
stream.autoclaimed
Result: [cursor_id, [[id, [field, value, …]], …]]
Xdel_entries (Priority: 50)
Mark entries as deleted
Given:
- XDEL key id [id …]
ids_exist(db) exists
Then:
- emit_event event:
stream.deleted
Result: count of deleted entries (0 if not found)
Xtrim_entries (Priority: 51)
Remove old entries by length or ID threshold
Given:
-
XTRIM key [MAXLEN MINID] [~] threshold [LIMIT count] trim_type(input) exists
Then:
- emit_event event:
stream.trimmed
Result: count of trimmed entries
Xinfo_stream (Priority: 60)
Get stream metadata
Given:
- XINFO STREAM key
Then:
- emit_event event:
stream.info_read
Result: stream information (length, IDs, entry count, consumer group count, etc.)
Xinfo_groups (Priority: 61)
List consumer groups
Given:
- XINFO GROUPS key
Then:
- emit_event event:
stream.groups_listed
Result: array of group info (name, consumers_count, pending_entries, last_id)
Xinfo_consumers (Priority: 62)
List consumers in group
Given:
- XINFO CONSUMERS key group
Then:
- emit_event event:
stream.consumers_listed
Result: array of consumer info (name, pending_count, idle_time)
Errors
| Code | Status | Message | Retry |
|---|---|---|---|
WRONGTYPE | 400 | Operation against a key holding the wrong kind of value | No |
NOGROUP | 404 | No such consumer group | No |
NOSCRIPT | 400 | Index out of range | No |
Events
| Event | Description | Payload |
|---|---|---|
stream.entry_added | ||
stream.trimmed | ||
stream.duplicate_detected | ||
stream.read | ||
stream.range_read | ||
stream.blocking_read | ||
stream.length_read | ||
stream.group_created | ||
stream.group_deleted | ||
stream.group_position_updated | ||
stream.consumer_created | ||
stream.consumer_deleted | ||
stream.group_read | ||
stream.group_blocking_read | ||
stream.acked | ||
stream.pending_summary | ||
stream.pending_details | ||
stream.claimed | ||
stream.autoclaimed | ||
stream.deleted | ||
stream.info_read | ||
stream.groups_listed | ||
stream.consumers_listed |
Related Blueprints
| Feature | Relationship | Reason |
|---|---|---|
| pub-sub-messaging | optional | Both provide message delivery; streams add persistence and groups |
| list-queue-operations | optional | Streams are persistent event logs; lists are transient queues |
| key-expiration | optional | Can trim streams by age/count |
AGI Readiness
Goals
Reliable Stream Event Log
Append-only event log with monotonically increasing IDs, consumer groups for distributed processing, and automatic acknowledgment tracking
Success Metrics:
| Metric | Target | Measurement |
|---|---|---|
| data_accuracy | 100% | Records matching source of truth |
| duplicate_rate | 0% | Duplicate records detected post-creation |
Constraints:
- performance (non-negotiable): Data consistency must be maintained across concurrent operations
Autonomy
Level: supervised
Escalation Triggers:
error_rate > 5
Tradeoffs
| Prefer | Over | Reason |
|---|---|---|
| data_integrity | performance | data consistency must be maintained across all operations |
Safety
| Action | Permission | Cooldown | Max Auto |
|---|---|---|---|
| xadd_entry | autonomous | - | - |
| xadd_with_trimming | autonomous | - | - |
| xadd_idempotent | autonomous | - | - |
| xread_entries | autonomous | - | - |
| xread_range | autonomous | - | - |
| xread_blocking | human_required | - | - |
| xlen_count | autonomous | - | - |
| xgroup_create | supervised | - | - |
| xgroup_destroy | human_required | - | - |
| xgroup_setid | autonomous | - | - |
| xgroup_createconsumer | supervised | - | - |
| xgroup_delconsumer | autonomous | - | - |
| xreadgroup_entries | autonomous | - | - |
| xreadgroup_blocking | human_required | - | - |
| xack_messages | autonomous | - | - |
| xpending_summary | autonomous | - | - |
| xpending_details | autonomous | - | - |
| xclaim_messages | autonomous | - | - |
| xautoclaim_messages | autonomous | - | - |
| xdel_entries | autonomous | - | - |
| xtrim_entries | autonomous | - | - |
| xinfo_stream | autonomous | - | - |
| xinfo_groups | autonomous | - | - |
| xinfo_consumers | autonomous | - | - |