s3 impl
This commit is contained in:
257
CONSUL_BLOCKING_QUERIES.md
Normal file
257
CONSUL_BLOCKING_QUERIES.md
Normal file
@@ -0,0 +1,257 @@
|
||||
# Consul KV Blocking Queries
|
||||
|
||||
## Overview
|
||||
|
||||
The Consul KV service now supports blocking queries (also known as long polling), which allows clients to efficiently wait for changes to keys without continuous polling.
|
||||
|
||||
## How It Works
|
||||
|
||||
When a client makes a GET request with blocking query parameters, the request will:
|
||||
|
||||
1. **Return immediately** if the key's ModifyIndex is greater than the provided index
|
||||
2. **Block and wait** if the index matches the current value, until either:
|
||||
- The key is updated (new ModifyIndex)
|
||||
- The wait timeout expires
|
||||
3. **Return the current state** when either condition is met
|
||||
|
||||
## Usage
|
||||
|
||||
### Query Parameters
|
||||
|
||||
- `index` - The ModifyIndex from a previous request. The query will block until the key's index changes
|
||||
- `wait` - Duration to wait for changes (e.g., `5s`, `10m`, `1h`). Defaults to 5 minutes if not specified
|
||||
|
||||
### Example: Basic Blocking Query
|
||||
|
||||
```bash
|
||||
# Get the current state
|
||||
curl http://localhost:8500/v1/kv/my-key
|
||||
|
||||
# Response includes ModifyIndex
|
||||
# [{"CreateIndex":1,"ModifyIndex":5,"Key":"my-key","Value":"..."}]
|
||||
|
||||
# Block until the key changes, up to 30 seconds
|
||||
curl "http://localhost:8500/v1/kv/my-key?index=5&wait=30s"
|
||||
|
||||
# This request will:
|
||||
# - Return immediately if ModifyIndex > 5
|
||||
# - Wait up to 30s if ModifyIndex == 5
|
||||
# - Return with new value when key is updated
|
||||
```
|
||||
|
||||
### Example: Watch Loop in JavaScript
|
||||
|
||||
```javascript
|
||||
async function watchKey(key) {
|
||||
let currentIndex = 0;
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
// Make blocking query
|
||||
const response = await fetch(`http://localhost:8500/v1/kv/${key}?index=${currentIndex}&wait=60s`);
|
||||
|
||||
if (response.ok) {
|
||||
const data = await response.json();
|
||||
const newIndex = data[0].ModifyIndex;
|
||||
|
||||
// Check if the value actually changed
|
||||
if (newIndex > currentIndex) {
|
||||
console.log('Key changed:', data[0]);
|
||||
currentIndex = newIndex;
|
||||
|
||||
// Process the change
|
||||
handleChange(data[0]);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Watch error:', error);
|
||||
await new Promise(r => setTimeout(r, 1000)); // Backoff
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Example: Terraform State Watching
|
||||
|
||||
```javascript
|
||||
// Watch for Terraform state changes
|
||||
async function watchTerraformState(statePath) {
|
||||
let index = 0;
|
||||
|
||||
while (true) {
|
||||
const response = await fetch(`http://localhost:8500/v1/kv/${statePath}?index=${index}&wait=5m`);
|
||||
|
||||
if (response.ok) {
|
||||
const [entry] = await response.json();
|
||||
|
||||
if (entry.ModifyIndex > index) {
|
||||
const stateData = JSON.parse(Buffer.from(entry.Value, 'base64').toString('utf-8'));
|
||||
|
||||
console.log('Terraform state updated:', {
|
||||
version: stateData.version,
|
||||
serial: stateData.serial,
|
||||
modifyIndex: entry.ModifyIndex,
|
||||
});
|
||||
|
||||
index = entry.ModifyIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
watchTerraformState('terraform/myproject/state');
|
||||
```
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Concurrent Watchers
|
||||
|
||||
Multiple clients can simultaneously watch the same key. When the key is updated, all waiting requests will be notified and return with the new value.
|
||||
|
||||
### Performance
|
||||
|
||||
- Minimal server overhead - no continuous polling
|
||||
- Automatic cleanup of watchers on timeout
|
||||
- Notifications are delivered immediately when changes occur
|
||||
- No database polling - uses in-memory event system
|
||||
|
||||
### Wait Durations
|
||||
|
||||
Supported time units:
|
||||
|
||||
- `s` - seconds (e.g., `30s`)
|
||||
- `m` - minutes (e.g., `5m`)
|
||||
- `h` - hours (e.g., `1h`)
|
||||
|
||||
Default: 5 minutes if not specified
|
||||
|
||||
### Edge Cases
|
||||
|
||||
#### Non-existent Keys
|
||||
|
||||
Blocking queries on non-existent keys with `index=0` will return 404 immediately. To wait for a key to be created, you need to handle 404 responses and retry.
|
||||
|
||||
#### Key Deletion
|
||||
|
||||
If a watched key is deleted, waiting queries will be notified and return 404.
|
||||
|
||||
#### Recursive Queries
|
||||
|
||||
Blocking queries do not currently support `recurse=true`. Use blocking queries only on specific keys.
|
||||
|
||||
## Testing
|
||||
|
||||
The test suite includes comprehensive blocking query tests:
|
||||
|
||||
```bash
|
||||
npm test -- consul-kv.spec.ts
|
||||
```
|
||||
|
||||
Tests verify:
|
||||
|
||||
- ✅ Immediate return when index is outdated
|
||||
- ✅ Blocking until key changes
|
||||
- ✅ Timeout behavior when no changes occur
|
||||
- ✅ Multiple concurrent watchers on same key
|
||||
- ✅ Non-existent key handling
|
||||
|
||||
## Comparison with Real Consul
|
||||
|
||||
This implementation matches Consul's blocking query behavior for:
|
||||
|
||||
- Standard wait/index semantics
|
||||
- Multiple concurrent watchers
|
||||
- Timeout handling
|
||||
- Immediate return on index mismatch
|
||||
|
||||
Differences from real Consul:
|
||||
|
||||
- No support for `recurse` with blocking queries
|
||||
- Simplified implementation (in-memory events vs raft log)
|
||||
- Wait times capped at request timeout (no separate max wait limit)
|
||||
|
||||
## Use Cases
|
||||
|
||||
### Configuration Management
|
||||
|
||||
Watch for configuration changes and reload application settings:
|
||||
|
||||
```javascript
|
||||
async function watchConfig(configKey) {
|
||||
let index = 0;
|
||||
|
||||
while (true) {
|
||||
const response = await fetch(`http://localhost:8500/v1/kv/${configKey}?index=${index}&wait=5m`);
|
||||
|
||||
if (response.ok) {
|
||||
const [entry] = await response.json();
|
||||
if (entry.ModifyIndex > index) {
|
||||
const config = JSON.parse(Buffer.from(entry.Value, 'base64').toString('utf-8'));
|
||||
reloadConfig(config);
|
||||
index = entry.ModifyIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Service Coordination
|
||||
|
||||
Wait for another service to signal readiness:
|
||||
|
||||
```javascript
|
||||
async function waitForService(serviceKey) {
|
||||
let index = 0;
|
||||
|
||||
while (true) {
|
||||
const response = await fetch(`http://localhost:8500/v1/kv/${serviceKey}/ready?index=${index}&wait=1m`);
|
||||
|
||||
if (response.ok) {
|
||||
const [entry] = await response.json();
|
||||
const value = Buffer.from(entry.Value, 'base64').toString('utf-8');
|
||||
|
||||
if (value === 'true') {
|
||||
console.log('Service is ready!');
|
||||
return;
|
||||
}
|
||||
|
||||
index = entry.ModifyIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### State Change Notifications
|
||||
|
||||
Monitor Terraform state for external changes:
|
||||
|
||||
```javascript
|
||||
async function monitorTerraformState(statePath) {
|
||||
let lastSerial = 0;
|
||||
let index = 0;
|
||||
|
||||
while (true) {
|
||||
const response = await fetch(`http://localhost:8500/v1/kv/${statePath}?index=${index}&wait=10m`);
|
||||
|
||||
if (response.ok) {
|
||||
const [entry] = await response.json();
|
||||
|
||||
if (entry.ModifyIndex > index) {
|
||||
const state = JSON.parse(Buffer.from(entry.Value, 'base64').toString('utf-8'));
|
||||
|
||||
if (state.serial > lastSerial) {
|
||||
console.log(`State changed: serial ${lastSerial} → ${state.serial}`);
|
||||
lastSerial = state.serial;
|
||||
|
||||
// Notify about drift
|
||||
if (await detectDrift(state)) {
|
||||
notifyTeam('Terraform drift detected!');
|
||||
}
|
||||
}
|
||||
|
||||
index = entry.ModifyIndex;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
Reference in New Issue
Block a user