258 lines
6.5 KiB
Markdown
258 lines
6.5 KiB
Markdown
# Consul KV Blocking Queries
|
|
|
|
## Overview
|
|
|
|
The Consul KV service now supports blocking queries (also known as long polling), which allows clients to efficiently wait for changes to keys without continuous polling.
|
|
|
|
## How It Works
|
|
|
|
When a client makes a GET request with blocking query parameters, the request will:
|
|
|
|
1. **Return immediately** if the key's ModifyIndex is greater than the provided index
|
|
2. **Block and wait** if the index matches the current value, until either:
|
|
- The key is updated (new ModifyIndex)
|
|
- The wait timeout expires
|
|
3. **Return the current state** when either condition is met
|
|
|
|
## Usage
|
|
|
|
### Query Parameters
|
|
|
|
- `index` - The ModifyIndex from a previous request. The query will block until the key's index changes
|
|
- `wait` - Duration to wait for changes (e.g., `5s`, `10m`, `1h`). Defaults to 5 minutes if not specified
|
|
|
|
### Example: Basic Blocking Query
|
|
|
|
```bash
|
|
# Get the current state
|
|
curl http://localhost:8500/v1/kv/my-key
|
|
|
|
# Response includes ModifyIndex
|
|
# [{"CreateIndex":1,"ModifyIndex":5,"Key":"my-key","Value":"..."}]
|
|
|
|
# Block until the key changes, up to 30 seconds
|
|
curl "http://localhost:8500/v1/kv/my-key?index=5&wait=30s"
|
|
|
|
# This request will:
|
|
# - Return immediately if ModifyIndex > 5
|
|
# - Wait up to 30s if ModifyIndex == 5
|
|
# - Return with new value when key is updated
|
|
```
|
|
|
|
### Example: Watch Loop in JavaScript
|
|
|
|
```javascript
|
|
async function watchKey(key) {
|
|
let currentIndex = 0;
|
|
|
|
while (true) {
|
|
try {
|
|
// Make blocking query
|
|
const response = await fetch(`http://localhost:8500/v1/kv/${key}?index=${currentIndex}&wait=60s`);
|
|
|
|
if (response.ok) {
|
|
const data = await response.json();
|
|
const newIndex = data[0].ModifyIndex;
|
|
|
|
// Check if the value actually changed
|
|
if (newIndex > currentIndex) {
|
|
console.log('Key changed:', data[0]);
|
|
currentIndex = newIndex;
|
|
|
|
// Process the change
|
|
handleChange(data[0]);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('Watch error:', error);
|
|
await new Promise(r => setTimeout(r, 1000)); // Backoff
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Example: Terraform State Watching
|
|
|
|
```javascript
|
|
// Watch for Terraform state changes
|
|
async function watchTerraformState(statePath) {
|
|
let index = 0;
|
|
|
|
while (true) {
|
|
const response = await fetch(`http://localhost:8500/v1/kv/${statePath}?index=${index}&wait=5m`);
|
|
|
|
if (response.ok) {
|
|
const [entry] = await response.json();
|
|
|
|
if (entry.ModifyIndex > index) {
|
|
const stateData = JSON.parse(Buffer.from(entry.Value, 'base64').toString('utf-8'));
|
|
|
|
console.log('Terraform state updated:', {
|
|
version: stateData.version,
|
|
serial: stateData.serial,
|
|
modifyIndex: entry.ModifyIndex,
|
|
});
|
|
|
|
index = entry.ModifyIndex;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
watchTerraformState('terraform/myproject/state');
|
|
```
|
|
|
|
## Implementation Details
|
|
|
|
### Concurrent Watchers
|
|
|
|
Multiple clients can simultaneously watch the same key. When the key is updated, all waiting requests will be notified and return with the new value.
|
|
|
|
### Performance
|
|
|
|
- Minimal server overhead - no continuous polling
|
|
- Automatic cleanup of watchers on timeout
|
|
- Notifications are delivered immediately when changes occur
|
|
- No database polling - uses in-memory event system
|
|
|
|
### Wait Durations
|
|
|
|
Supported time units:
|
|
|
|
- `s` - seconds (e.g., `30s`)
|
|
- `m` - minutes (e.g., `5m`)
|
|
- `h` - hours (e.g., `1h`)
|
|
|
|
Default: 5 minutes if not specified
|
|
|
|
### Edge Cases
|
|
|
|
#### Non-existent Keys
|
|
|
|
Blocking queries on non-existent keys with `index=0` will return 404 immediately. To wait for a key to be created, you need to handle 404 responses and retry.
|
|
|
|
#### Key Deletion
|
|
|
|
If a watched key is deleted, waiting queries will be notified and return 404.
|
|
|
|
#### Recursive Queries
|
|
|
|
Blocking queries do not currently support `recurse=true`. Use blocking queries only on specific keys.
|
|
|
|
## Testing
|
|
|
|
The test suite includes comprehensive blocking query tests:
|
|
|
|
```bash
|
|
npm test -- consul-kv.spec.ts
|
|
```
|
|
|
|
Tests verify:
|
|
|
|
- ✅ Immediate return when index is outdated
|
|
- ✅ Blocking until key changes
|
|
- ✅ Timeout behavior when no changes occur
|
|
- ✅ Multiple concurrent watchers on same key
|
|
- ✅ Non-existent key handling
|
|
|
|
## Comparison with Real Consul
|
|
|
|
This implementation matches Consul's blocking query behavior for:
|
|
|
|
- Standard wait/index semantics
|
|
- Multiple concurrent watchers
|
|
- Timeout handling
|
|
- Immediate return on index mismatch
|
|
|
|
Differences from real Consul:
|
|
|
|
- No support for `recurse` with blocking queries
|
|
- Simplified implementation (in-memory events vs raft log)
|
|
- Wait times capped at request timeout (no separate max wait limit)
|
|
|
|
## Use Cases
|
|
|
|
### Configuration Management
|
|
|
|
Watch for configuration changes and reload application settings:
|
|
|
|
```javascript
|
|
async function watchConfig(configKey) {
|
|
let index = 0;
|
|
|
|
while (true) {
|
|
const response = await fetch(`http://localhost:8500/v1/kv/${configKey}?index=${index}&wait=5m`);
|
|
|
|
if (response.ok) {
|
|
const [entry] = await response.json();
|
|
if (entry.ModifyIndex > index) {
|
|
const config = JSON.parse(Buffer.from(entry.Value, 'base64').toString('utf-8'));
|
|
reloadConfig(config);
|
|
index = entry.ModifyIndex;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Service Coordination
|
|
|
|
Wait for another service to signal readiness:
|
|
|
|
```javascript
|
|
async function waitForService(serviceKey) {
|
|
let index = 0;
|
|
|
|
while (true) {
|
|
const response = await fetch(`http://localhost:8500/v1/kv/${serviceKey}/ready?index=${index}&wait=1m`);
|
|
|
|
if (response.ok) {
|
|
const [entry] = await response.json();
|
|
const value = Buffer.from(entry.Value, 'base64').toString('utf-8');
|
|
|
|
if (value === 'true') {
|
|
console.log('Service is ready!');
|
|
return;
|
|
}
|
|
|
|
index = entry.ModifyIndex;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### State Change Notifications
|
|
|
|
Monitor Terraform state for external changes:
|
|
|
|
```javascript
|
|
async function monitorTerraformState(statePath) {
|
|
let lastSerial = 0;
|
|
let index = 0;
|
|
|
|
while (true) {
|
|
const response = await fetch(`http://localhost:8500/v1/kv/${statePath}?index=${index}&wait=10m`);
|
|
|
|
if (response.ok) {
|
|
const [entry] = await response.json();
|
|
|
|
if (entry.ModifyIndex > index) {
|
|
const state = JSON.parse(Buffer.from(entry.Value, 'base64').toString('utf-8'));
|
|
|
|
if (state.serial > lastSerial) {
|
|
console.log(`State changed: serial ${lastSerial} → ${state.serial}`);
|
|
lastSerial = state.serial;
|
|
|
|
// Notify about drift
|
|
if (await detectDrift(state)) {
|
|
notifyTeam('Terraform drift detected!');
|
|
}
|
|
}
|
|
|
|
index = entry.ModifyIndex;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|