aegis-streaming
Real-time Streaming Engine for Aegis Database Platform.
Overview
Event streaming and change data capture for real-time data processing. Supports pub/sub messaging, event sourcing, and stream processing.
Modules
event.rs
Core event types:
EventId- Unique event identifierEventType- Created, Updated, Deleted, CustomEvent- Event with source, timestamp, data, metadataEventData- Payload (Null, Bool, Int, Float, String, Bytes, Json)EventFilter- Filter by type, source, timestampEventBatch- Batch of events
channel.rs
Pub/sub channels:
ChannelId- Channel identifierChannelConfig- Buffer size, max subscribers, persistence, retentionChannel- Publish/subscribe messagingChannelReceiver- Async event receiver with filteringChannelStats- Events published, subscriber count- History support for persistent channels
subscriber.rs
Subscription management:
SubscriberId- Subscriber identifierSubscription- Channels, filter, metadataSubscriber- Tracks subscriptions and statisticsDeliveryMode- AtMostOnce, AtLeastOnce, ExactlyOnceAckMode- Auto, Manual, None
cdc.rs
Change data capture:
ChangeType- Insert, Update, Delete, TruncateChangeEvent- Before/after data with metadataChangeSource- Database, table, schemaCdcConfig- Batch size, timeout, include_beforeCdcSourceConfig- Per-table CDC configurationCdcPosition- Track position for resumable streaming
stream.rs
Stream processing:
EventStream- Buffered event stream with max sizeStreamProcessor- Processing pipelineProcessingStep- Filter, Map, TransformDataWindowedStream- Time-windowed eventsWindow- Time bucket with eventsAggregateFunction- Count, Sum, Avg, Min, Max
engine.rs
Core engine:
StreamingEngine- Main entry point- Channel management (create, delete, list)
- Publishing (single, batch, to multiple channels)
- Subscribing (with optional filters)
- History retrieval
- Statistics tracking
Usage Example
use aegis_streaming::*;
// Create engine
let engine = StreamingEngine::new();
// Create channel
engine.create_channel("events")?;
// Subscribe
let channel_id = ChannelId::new("events");
let mut receiver = engine.subscribe(&channel_id, "subscriber1")?;
// Publish event
let event = Event::new(
EventType::Created,
"users",
EventData::Json(serde_json::json!({"id": 1, "name": "Alice"})),
);
engine.publish(&channel_id, event)?;
// Receive events (async)
let received = receiver.recv().await?;
CDC Example
use aegis_streaming::*;
// Configure CDC
let config = CdcConfig::new()
.with_source(CdcSourceConfig::new("mydb", "users"))
.with_batch_size(100);
// Create change events
let change = ChangeEvent::insert(
ChangeSource::new("mydb", "users"),
"123".to_string(),
serde_json::json!({"id": 123, "name": "Bob"}),
);
// Publish to channel
let event = change.to_event();
engine.publish(&channel_id, event)?;
Stream Processing Example
use aegis_streaming::*;
// Build processing pipeline
let processor = StreamProcessor::new()
.filter(EventFilter::new().with_type(EventType::Created))
.map(|mut e| {
e.metadata.insert("processed".to_string(), "true".to_string());
e
});
// Process events
let processed = processor.process(event);
let batch = processor.process_batch(events);
Tests
31 tests covering all modules.