Performance Tuning Guide: Structured Concurrency
This guide covers performance optimization strategies for VectorWave's structured concurrency implementation, helping you achieve optimal performance for your specific workloads.
Table of Contents
- Performance Overview
- Configuration Parameters
- Adaptive Threshold Tuning
- Workload-Specific Optimization
- Monitoring and Profiling
- Hardware Considerations
- Common Performance Issues
- Benchmarking Guidelines
Performance Overview
Key Performance Factors
- Parallelization Threshold: When to switch from sequential to parallel processing
- Thread Pool Sizing: Optimal number of worker threads
- Chunk Size: How to divide work among threads
- Memory Layout: Cache-friendly data organization
- SIMD Integration: Vector API optimization coordination
Performance Characteristics
| Workload Type | Optimal Configuration | Expected Speedup |
|---|---|---|
| Small Signals (< 512 samples) | Sequential processing | 1.0x (no overhead) |
| Medium Signals (512-4096 samples) | Moderate parallelization | 1.5-2.5x |
| Large Signals (> 4096 samples) | Full parallelization | 2.0-4.0x |
| Batch Processing (many signals) | High parallelization | 3.0-6.0x |
Configuration Parameters
ParallelConfig Builder
ParallelConfig config = ParallelConfig.builder()
.targetCores(cores) // Number of worker threads
.minParallelThreshold(threshold) // Minimum work size for parallelization
.maxChunkSize(chunkSize) // Maximum work unit size
.build();
Parameter Guidelines
targetCores
- CPU-bound tasks:
Runtime.getRuntime().availableProcessors() - I/O-bound tasks:
availableProcessors() * 2 - Mixed workloads:
availableProcessors() * 1.5 - NUMA systems: Consider topology-aware allocation
// Automatic detection
int optimalCores = Math.min(
Runtime.getRuntime().availableProcessors(),
estimateOptimalThreads(workloadCharacteristics)
);
ParallelConfig config = ParallelConfig.builder()
.targetCores(optimalCores)
.build();
minParallelThreshold
Controls when to use parallel vs sequential processing:
// For different signal characteristics
int threshold = switch (signalType) {
case AUDIO_SAMPLES -> 256; // Small audio chunks
case FINANCIAL_TICKS -> 512; // Financial time series
case SCIENTIFIC_DATA -> 1024; // Large scientific datasets
case REAL_TIME -> 128; // Low-latency processing
};
maxChunkSize
Limits work unit size for better cache utilization:
// Cache-aware chunk sizing
int l3CacheSize = estimateL3CacheSize(); // e.g., 32MB
int sampleSize = Double.BYTES;
int optimalChunkSize = Math.min(
8192, // Default maximum
l3CacheSize / (sampleSize * 4) // Account for working set
);
Workload-Specific Configurations
High-Throughput Batch Processing
ParallelConfig batchConfig = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors())
.minParallelThreshold(128) // Low threshold for high parallelization
.maxChunkSize(4096)
.build();
StructuredParallelTransform transform = new StructuredParallelTransform(
wavelet, PaddingStrategies.PERIODIC, batchConfig);
Low-Latency Real-Time Processing
ParallelConfig realTimeConfig = ParallelConfig.builder()
.targetCores(Math.min(4, Runtime.getRuntime().availableProcessors()))
.minParallelThreshold(256) // Higher threshold to avoid overhead
.maxChunkSize(2048)
.build();
Memory-Constrained Environments
ParallelConfig memoryConfig = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors() / 2)
.minParallelThreshold(1024) // Higher threshold
.maxChunkSize(2048) // Smaller chunks
.build();
Adaptive Threshold Tuning
Multi-Armed Bandit Optimization
VectorWave includes an adaptive threshold tuner that uses machine learning to optimize parallel thresholds based on actual performance:
AdaptiveThresholdTuner tuner = new AdaptiveThresholdTuner();
// Process multiple batches with automatic optimization
for (int batch = 0; batch < numBatches; batch++) {
// Get adaptive threshold for current workload
int adaptiveThreshold = tuner.getAdaptiveThreshold(
AdaptiveThresholdTuner.OperationType.MODWT_DECOMPOSE,
signals[0].length, // Data size
1.0 // Complexity factor
);
ParallelConfig adaptiveConfig = ParallelConfig.builder()
.minParallelThreshold(adaptiveThreshold)
.build();
long startTime = System.nanoTime();
try (var executor = new StructuredExecutor(adaptiveConfig)) {
List<MODWTResult> results = executor.invokeAll(signals,
signal -> transform.forward(signal));
long elapsed = System.nanoTime() - startTime;
// Record performance for future optimization
tuner.recordMeasurement(
AdaptiveThresholdTuner.OperationType.MODWT_DECOMPOSE,
signals[0].length,
adaptiveThreshold,
elapsed,
estimatedSequentialTime
);
}
}
Tuning Parameters
Operation Types
// Different operations have different optimal thresholds
AdaptiveThresholdTuner.OperationType operationType = switch (transformType) {
case "MODWT_FORWARD" -> AdaptiveThresholdTuner.OperationType.MODWT_DECOMPOSE;
case "MODWT_INVERSE" -> AdaptiveThresholdTuner.OperationType.MODWT_RECONSTRUCT;
case "CWT_FORWARD" -> AdaptiveThresholdTuner.OperationType.CWT_ANALYSIS;
case "DENOISING" -> AdaptiveThresholdTuner.OperationType.MODWT_DECOMPOSE;
};
Complexity Factors
Adjust complexity based on wavelet and signal characteristics:
double complexityFactor = calculateComplexity(wavelet, PaddingStrategies.PERIODIC, signalNoise);
// More complex operations benefit from higher thresholds
double complexity = switch (wavelet.getName()) {
case "HAAR" -> 0.5; // Simple wavelet
case "DB4", "DB8" -> 1.0; // Standard complexity
case "BIOR_4_4" -> 1.2; // Biorthogonal (more complex)
case "COIF4" -> 1.3; // Coiflet (highest complexity)
default -> 1.0;
};
Convergence Monitoring
// Enable debug output to monitor tuner convergence
System.setProperty("debug.tuner", "true");
// Check convergence after warm-up period
if (batch > WARMUP_BATCHES) {
double confidence = tuner.getConfidenceLevel(operationType, dataSize);
if (confidence > 0.95) {
System.out.println("Threshold tuning has converged with high confidence");
}
}
Workload-Specific Optimization
Financial Time Series Analysis
// Configure for financial data characteristics
ParallelConfig financialConfig = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors())
.minParallelThreshold(512) // Financial ticks are typically small
.maxChunkSize(2048)
.build();
// Use Paul wavelet for asymmetric crash detection
StructuredParallelTransform financialTransform = new StructuredParallelTransform(
new PaulWavelet(4), PaddingStrategies.PERIODIC, financialConfig);
// Process with progress monitoring for large datasets
MODWTResult[] results = financialTransform.forwardBatchWithProgress(priceSignals,
(completed, total) -> {
if (completed % 100 == 0) { // Update every 100 signals
System.out.printf("Financial analysis: %d/%d signals processed%n",
completed, total);
}
});
Scientific Computing
// High-precision scientific computing configuration
ParallelConfig scientificConfig = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors())
.minParallelThreshold(1024) // Larger thresholds for complex computations
.maxChunkSize(8192)
.build();
// Use Daubechies wavelets for good frequency localization
StructuredParallelTransform scientificTransform = new StructuredParallelTransform(
Daubechies.DB8, PaddingStrategies.ZERO, scientificConfig);
Real-Time Audio Processing
// Low-latency configuration for audio processing
ParallelConfig audioConfig = ParallelConfig.builder()
.targetCores(4) // Limit cores to avoid thread scheduling overhead
.minParallelThreshold(256) // Audio frames are small
.maxChunkSize(1024)
.build();
// Use Haar for fast processing
StructuredParallelTransform audioTransform = new StructuredParallelTransform(
new Haar(), PaddingStrategies.PERIODIC, audioConfig);
// Process with timeout for real-time constraints
try {
MODWTResult[] results = audioTransform.forwardBatchWithTimeout(audioFrames, 10); // 10ms deadline
} catch (StructuredParallelTransform.ComputationException e) {
// Fallback to sequential processing for missed deadline
MODWTResult[] results = sequentialTransform.forwardBatch(audioFrames);
}
Monitoring and Profiling
Built-in Performance Monitoring
// Enable performance monitoring
System.setProperty("debug.executor", "true");
try (var executor = new StructuredExecutor(config)) {
long startTime = System.nanoTime();
List<MODWTResult> results = executor.invokeAll(signals,
signal -> transform.forward(signal));
long elapsed = System.nanoTime() - startTime;
// Calculate performance metrics
double signalsPerSecond = (signals.size() * 1e9) / elapsed;
double samplesPerSecond = (totalSamples * 1e9) / elapsed;
double parallelEfficiency = calculateEfficiency(elapsed, sequentialTime, coreCount);
System.out.printf("Performance: %.1f signals/sec, %.1f samples/sec, %.1f%% efficiency%n",
signalsPerSecond, samplesPerSecond, parallelEfficiency * 100);
}
JMH Benchmarking
Use JMH for accurate performance measurements:
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class StructuredConcurrencyBenchmark {
private StructuredParallelTransform transform;
private double[][] signals;
@Setup
public void setup() {
ParallelConfig config = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors())
.minParallelThreshold(512)
.build();
transform = new StructuredParallelTransform(
Daubechies.DB4, PaddingStrategies.PERIODIC, config);
signals = generateTestSignals(100, 1024);
}
@Benchmark
public MODWTResult[] benchmarkParallelTransform() {
return transform.forwardBatch(signals);
}
@Benchmark
@Param({"128", "256", "512", "1024", "2048"})
public MODWTResult[] benchmarkThresholdSweep(int threshold) {
ParallelConfig config = ParallelConfig.builder()
.minParallelThreshold(threshold)
.build();
StructuredParallelTransform thresholdTransform = new StructuredParallelTransform(
Daubechies.DB4, PaddingStrategies.PERIODIC, config);
return thresholdTransform.forwardBatch(signals);
}
}
Memory Profiling
// Monitor memory usage during batch processing
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
long beforeMemory = memoryBean.getHeapMemoryUsage().getUsed();
try (var executor = new StructuredExecutor(config)) {
List<MODWTResult> results = executor.invokeAll(signals,
signal -> transform.forward(signal));
long afterMemory = memoryBean.getHeapMemoryUsage().getUsed();
long memoryUsed = afterMemory - beforeMemory;
System.out.printf("Memory usage: %.2f MB for %d signals%n",
memoryUsed / (1024.0 * 1024.0), signals.size());
}
Hardware Considerations
CPU Architecture Optimization
x86-64 Systems
// Optimize for x86 systems with AVX2/AVX512
ParallelConfig x86Config = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors())
.minParallelThreshold(256) // Take advantage of wide SIMD
.maxChunkSize(8192) // Larger chunks for vector processing
.build();
ARM Systems (Apple Silicon, AWS Graviton)
// Optimize for ARM systems with NEON
ParallelConfig armConfig = ParallelConfig.builder()
.targetCores(Runtime.getRuntime().availableProcessors())
.minParallelThreshold(512) // NEON has narrower SIMD than AVX512
.maxChunkSize(4096)
.build();
NUMA Awareness
For NUMA systems, consider thread affinity:
// Check NUMA topology
boolean isNUMASystem = Runtime.getRuntime().availableProcessors() > 16;
if (isNUMASystem) {
// Use fewer cores per NUMA node to reduce memory latency
int coresPerNode = Runtime.getRuntime().availableProcessors() / 2;
ParallelConfig numaConfig = ParallelConfig.builder()
.targetCores(coresPerNode)
.minParallelThreshold(1024) // Higher threshold for NUMA
.build();
}
Memory Hierarchy Optimization
Cache-Aware Processing
// Estimate cache sizes and configure accordingly
long l3CacheSize = estimateL3CacheSize(); // Platform-specific estimation
int optimalChunkSize = (int) Math.min(
8192,
l3CacheSize / (Double.BYTES * 8) // Account for coefficient working set
);
ParallelConfig cacheConfig = ParallelConfig.builder()
.maxChunkSize(optimalChunkSize)
.build();
Prefetching Strategies
// Configure for different memory access patterns
ParallelConfig prefetchConfig = switch (accessPattern) {
case SEQUENTIAL -> ParallelConfig.builder()
.maxChunkSize(8192) // Large chunks for sequential access
.build();
case RANDOM -> ParallelConfig.builder()
.maxChunkSize(2048) // Smaller chunks for random access
.build();
case STRIDED -> ParallelConfig.builder()
.maxChunkSize(4096) // Medium chunks for strided access
.build();
};
Common Performance Issues
Issue 1: Over-Parallelization
Symptoms:
- Parallel processing slower than sequential
- High CPU usage with low throughput
- Thread contention in profiler
Diagnosis:
// Enable debug mode to see threshold decisions
System.setProperty("debug.tuner", "true");
// Monitor thread utilization
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadBean.getAllThreadIds();
System.out.println("Active threads: " + threadIds.length);
Solutions:
// Increase threshold to reduce parallelization
ParallelConfig fixedConfig = ParallelConfig.builder()
.minParallelThreshold(2048) // Higher threshold
.targetCores(Math.min(4, Runtime.getRuntime().availableProcessors()))
.build();
// Or use adaptive tuning
AdaptiveThresholdTuner tuner = new AdaptiveThresholdTuner();
int adaptiveThreshold = tuner.getAdaptiveThreshold(operationType, dataSize, 1.0);
Issue 2: Memory Pressure
Symptoms:
- Increasing garbage collection frequency
- OutOfMemoryError exceptions
- Degraded performance over time
Diagnosis:
// Monitor memory usage patterns
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
long heapUsed = memoryBean.getHeapMemoryUsage().getUsed();
long heapMax = memoryBean.getHeapMemoryUsage().getMax();
double heapUtilization = (double) heapUsed / heapMax;
if (heapUtilization > 0.8) {
System.out.println("WARNING: High heap utilization: " + heapUtilization);
}
Solutions:
// Process in smaller batches
int batchSize = Math.min(originalBatchSize,
(int) (availableMemory / estimatedSignalMemory));
for (int i = 0; i < signals.length; i += batchSize) {
double[][] batch = Arrays.copyOfRange(signals, i,
Math.min(i + batchSize, signals.length));
try (var executor = new StructuredExecutor(config)) {
List<MODWTResult> batchResults = executor.invokeAll(Arrays.asList(batch),
signal -> transform.forward(signal));
// Process results immediately
}
// Optional: suggest GC between batches for large datasets
if (batchSize < originalBatchSize) {
System.gc();
}
}
Issue 3: Timeout Issues
Symptoms:
- ComputionException with timeout messages
- Inconsistent completion times
- Tasks completing after timeout
Solutions:
// Use adaptive timeout based on workload
long estimatedTime = estimateProcessingTime(signals);
long adaptiveTimeout = Math.max(
estimatedTime * 2, // 2x buffer
5000 // Minimum 5 second timeout
);
// Implement graceful degradation
try {
return transform.forwardBatchWithTimeout(signals, adaptiveTimeout);
} catch (StructuredParallelTransform.ComputationException e) {
if (e.getMessage().contains("timeout")) {
// Fall back to smaller batch or sequential processing
return fallbackProcessing(signals);
}
throw e;
}
Benchmarking Guidelines
Establishing Baselines
// Sequential baseline
long sequentialStart = System.nanoTime();
List<MODWTResult> sequentialResults = new ArrayList<>();
for (double[] signal : signals) {
sequentialResults.add(transform.forward(signal));
}
long sequentialTime = System.nanoTime() - sequentialStart;
// Parallel implementation
long parallelStart = System.nanoTime();
List<MODWTResult> parallelResults = executor.invokeAll(signals,
signal -> transform.forward(signal));
long parallelTime = System.nanoTime() - parallelStart;
// Calculate speedup
double speedup = (double) sequentialTime / parallelTime;
double efficiency = speedup / Runtime.getRuntime().availableProcessors();
System.out.printf("Speedup: %.2fx, Efficiency: %.2f%%\n",
speedup, efficiency * 100);
Comprehensive Performance Testing
public class PerformanceSuite {
@Test
public void benchmarkScaling() {
int[] signalCounts = {1, 5, 10, 25, 50, 100, 200, 500};
int[] signalLengths = {256, 512, 1024, 2048, 4096};
for (int count : signalCounts) {
for (int length : signalLengths) {
double[][] signals = generateTestSignals(count, length);
// Measure performance
long startTime = System.nanoTime();
MODWTResult[] results = transform.forwardBatch(signals);
long elapsed = System.nanoTime() - startTime;
double throughput = (count * 1e9) / elapsed;
System.out.printf("Count: %d, Length: %d, Throughput: %.1f signals/sec%n",
count, length, throughput);
}
}
}
@Test
public void benchmarkThresholdSweep() {
int[] thresholds = {64, 128, 256, 512, 1024, 2048, 4096};
double[][] signals = generateTestSignals(100, 1024);
for (int threshold : thresholds) {
ParallelConfig config = ParallelConfig.builder()
.minParallelThreshold(threshold)
.build();
StructuredParallelTransform testTransform = new StructuredParallelTransform(
Daubechies.DB4, PaddingStrategies.PERIODIC, config);
// Warm up
for (int i = 0; i < 5; i++) {
testTransform.forwardBatch(signals);
}
// Measure
long startTime = System.nanoTime();
for (int i = 0; i < 10; i++) {
testTransform.forwardBatch(signals);
}
long elapsed = System.nanoTime() - startTime;
double avgTime = elapsed / (10.0 * 1e6); // ms
System.out.printf("Threshold: %d, Avg Time: %.2f ms%n", threshold, avgTime);
}
}
}
Performance Regression Testing
// Automated performance regression detection
public void testPerformanceRegression() {
double[][] signals = generateTestSignals(100, 1024);
// Establish baseline (store in version control or database)
double baselineTime = getBaselineTime("structured_parallel_forward_batch");
// Current performance
long startTime = System.nanoTime();
MODWTResult[] results = transform.forwardBatch(signals);
long elapsed = System.nanoTime() - startTime;
double currentTime = elapsed / 1e6; // ms
// Check for regression (more than 10% slower)
double regressionThreshold = 1.1;
if (currentTime > baselineTime * regressionThreshold) {
fail(String.format("Performance regression detected: %.2f ms > %.2f ms (%.1f%% slower)",
currentTime, baselineTime,
((currentTime / baselineTime) - 1) * 100));
}
// Update baseline if significantly faster
if (currentTime < baselineTime * 0.95) {
updateBaseline("structured_parallel_forward_batch", currentTime);
}
}
Conclusion
Performance tuning structured concurrency requires understanding both your workload characteristics and the underlying hardware. Key strategies include:
- Start with appropriate defaults based on workload type
- Use adaptive tuning for automatic optimization
- Monitor performance continuously during development
- Profile systematically to identify bottlenecks
- Test across different hardware configurations
- Establish regression testing to maintain performance
The structured concurrency implementation in VectorWave provides both automatic optimizations and fine-grained control for performance-critical applications. Combine built-in adaptive tuning with workload-specific configuration for optimal results.
SIMD Integration (Extensions)
SIMD optimizations via the Java Vector API are available in the optional vectorwave-extensions module (Java 25 + incubator). Core paths are scalar Java 25. To enable SIMD:
--add-modules jdk.incubator.vector --enable-preview
- Complex spectra are stored as interleaved primitives (
re0, im0, re1, im1, …) to keep SIMD loads unit-stride—skip copying into separate real/imag buffers when chasing throughput. - Frequency-domain multiplies use fused vector FMA instructions; mismatched lengths fall back to the scalar path automatically.
- Overlap-save windows and accumulation buffers rely on the same layout, so keep block copies contiguous for best results.
- Structured CWT executors emit
[scale][time]tiles in contiguous float rows—align downstream consumers with that layout to avoid extra transposes. - Streaming builds reuse head/tail ring buffers to advance windows without per-hop allocations; feed new samples, not whole blocks.