UniqSeq API
API reference for the UniqSeq class - the core deduplication engine.
Overview
The UniqSeq class provides the core deduplication algorithm. It processes lines one at a time in streaming fashion, maintaining bounded memory usage regardless of input size.
Key Features
- Streaming Processing: Process unlimited input with fixed memory
- Position-Aware Matching: Tracks where sequences occur for accurate duplicate detection
- Configurable Window Size: Detect sequences of any length
- Pattern Filtering: Include or exclude lines based on regex patterns
- Library Support: Load and save known sequence patterns
- Inverse Mode: Isolate repeated patterns for analysis
- Annotation Support: Mark where duplicates were removed
Class Reference
uniqseq.uniqseq.UniqSeq
Streaming line sequence uniqseq with context-aware matching.
Tracks WHERE sequences occur to enable proper duplicate detection.
__init__(window_size=MIN_SEQUENCE_LENGTH, max_history=DEFAULT_MAX_HISTORY, max_unique_sequences=DEFAULT_MAX_UNIQUE_SEQUENCES, max_candidates=DEFAULT_MAX_CANDIDATES, skip_chars=0, hash_transform=None, delimiter='\n', preloaded_sequences=None, save_sequence_callback=None, filter_patterns=None, inverse=False, annotate=False, annotation_format=None, explain=False)
Initialize uniqseq.
Args:
window_size: Minimum sequence length to detect (default: 10)
max_history: Maximum window hash history (default: 100000), or None for unlimited
max_unique_sequences: Maximum unique sequences to track (default: 10000),
or None for unlimited
max_candidates: Maximum concurrent candidates to track (default: 100),
or None for unlimited. Lower values improve performance but may
miss some patterns.
skip_chars: Number of characters to skip from line start when hashing (default: 0)
hash_transform: Optional function to transform line before hashing (default: None)
Function receives line (str or bytes) and returns transformed line
(str or bytes). Must return exactly one line per input
(no filtering/splitting)
delimiter: Delimiter to use when writing output (default: "
") Should be str for text mode, bytes for binary mode preloaded_sequences: Optional set of sequence_content strings/bytes to treat as "already seen". These sequences are skipped on first observation and have unlimited retention (never evicted) save_sequence_callback: Optional callback(file_content) called when a sequence should be saved to library. Receives the raw file content (with delimiters). The callback computes its own hash. filter_patterns: Optional list of FilterPattern objects for sequential pattern matching. Patterns are evaluated in order; first match determines action. "track" = include for deduplication, "bypass" = pass through unchanged. inverse: If True, inverse mode: keep duplicates, remove unique sequences (default: False) annotate: If True, add inline markers showing where duplicates were skipped (default: False) annotation_format: Custom annotation template string. Variables: {start}, {end}, {match_start}, {match_end}, {count}, {window_size} (default: None) explain: If True, output explanations to stderr showing why lines were kept or skipped (default: False)
flush()
Emit remaining buffered lines to output buffer at EOF.
flush_to_stream(output=sys.stdout)
Flush remaining buffered lines to a stream (backward compatibility wrapper).
For new code, prefer using process_lines() iterator which handles flushing automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output
|
Union[TextIO, BinaryIO]
|
Output stream (default: stdout) |
stdout
|
get_stats()
Get deduplication statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Union[int, float]]
|
Dictionary with keys: total, emitted, skipped, redundancy_pct, unique_sequences |
process_line(line, output=sys.stdout, progress_callback=None)
Process a single line, writing output to a stream (backward compatibility wrapper).
For new code, prefer using process_lines() iterator which is more Pythonic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
line
|
Union[str, bytes]
|
Line to process (without trailing newline/delimiter, str or bytes) |
required |
output
|
Union[TextIO, BinaryIO]
|
Output stream (default: stdout) |
stdout
|
progress_callback
|
Optional[Callable[[int, int, int], None]]
|
Optional callback(line_num, lines_skipped, seq_count) called every 1000 lines with current statistics |
None
|
process_lines(lines, progress_callback=None)
Process lines through duplicate detection, yielding non-duplicate lines.
This is the preferred Pythonic API for using UniqSeq. It processes an iterable of lines and yields lines that should be output (non-duplicates in normal mode, duplicates in inverse mode).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
lines
|
Iterable[Union[str, bytes]]
|
Iterable of lines to process (without trailing newline/delimiter) |
required |
progress_callback
|
Optional[Callable[[int, int, int], None]]
|
Optional callback(line_num, lines_skipped, seq_count) called every 1000 lines with current statistics |
None
|
Yields:
| Type | Description |
|---|---|
Union[str, bytes]
|
Lines that pass deduplication (str or bytes matching input type) |
Example
from uniqseq import UniqSeq deduplicator = UniqSeq(window_size=3) input_lines = ["A", "B", "C", "A", "B", "C"] output = list(deduplicator.process_lines(input_lines)) print(output) ['A', 'B', 'C']
Basic Usage
Simple Deduplication
from uniqseq import UniqSeq
import sys
# Create uniqseq with default settings
uniqseq = UniqSeq(window_size=10)
# Process lines from stdin
for line in sys.stdin:
line = line.rstrip('\n') # Remove newline
uniqseq.process_line(line, sys.stdout)
# Flush remaining buffer
uniqseq.flush(sys.stdout)
# Get statistics
stats = uniqseq.get_stats()
print(
f"Processed {stats['total']} lines, skipped {stats['skipped']}",
file=sys.stderr
)
Custom Configuration
from uniqseq import UniqSeq
uniqseq = UniqSeq(
window_size=5, # Detect 5-line sequences
max_history=50000, # Track up to 50k unique windows
max_candidates=50, # Limit concurrent candidates (faster)
skip_chars=21, # Skip timestamp prefix
)
# Process file
with open('input.log') as f:
for line in f:
line = line.rstrip('\n')
uniqseq.process_line(line)
uniqseq.flush()
Performance Tuning
from uniqseq import UniqSeq
# Fast mode: good for large files where speed is critical
fast_uniqseq = UniqSeq(
window_size=10,
max_candidates=30, # Fewer candidates = faster
max_history=50000, # Smaller history = less memory
)
# Accurate mode: comprehensive analysis
accurate_uniqseq = UniqSeq(
window_size=10,
max_candidates=None, # Unlimited = catches all patterns
max_history=None, # Unlimited = complete history
)
# Balanced mode (default): good for most use cases
balanced_uniqseq = UniqSeq(
window_size=10,
max_candidates=100, # Default: balanced performance
max_history=100000, # Default: reasonable memory
)
Advanced Features
Pattern Filtering
import re
from uniqseq.uniqseq import UniqSeq, FilterPattern
# Create filter patterns
patterns = [
FilterPattern(
pattern=r'^ERROR',
action='track',
regex=re.compile(r'^ERROR')
),
FilterPattern(
pattern=r'^WARN',
action='bypass',
regex=re.compile(r'^WARN')
),
]
uniqseq = UniqSeq(
window_size=10,
filter_patterns=patterns
)
Hash Transformation
def extract_log_message(line: str) -> str:
"""Extract just the message part of a log line."""
# Skip timestamp and log level, keep only message
parts = line.split(maxsplit=3)
return parts[3] if len(parts) > 3 else line
uniqseq = UniqSeq(
window_size=10,
hash_transform=extract_log_message
)
Library Mode
from pathlib import Path
def save_callback(seq_hash: str, seq_lines: list[str]) -> None:
"""Save discovered sequences to disk."""
output_dir = Path('~/sequences').expanduser()
output_dir.mkdir(exist_ok=True)
filepath = output_dir / f'{seq_hash}.txt'
filepath.write_text('\n'.join(seq_lines))
uniqseq = UniqSeq(
window_size=10,
save_sequence_callback=save_callback
)
Preloaded Sequences
# Load known patterns to skip on first occurrence
preloaded = {
'abc123def456': 'Line 1\nLine 2\nLine 3',
'789ghi012jkl': 'Error A\nError B\nError C',
}
uniqseq = UniqSeq(
window_size=3,
preloaded_sequences=preloaded
)
Inverse Mode
# Keep only duplicates (for pattern analysis)
uniqseq = UniqSeq(
window_size=10,
inverse=True
)
# Process lines - will output only repeated sequences
for line in input_lines:
uniqseq.process_line(line, sys.stdout)
uniqseq.flush(sys.stdout)
Annotations
# Add markers where duplicates were skipped
uniqseq = UniqSeq(
window_size=10,
annotate=True,
annotation_format='[SKIP: Lines {start}-{end}, seen {count}x]'
)
Memory Management
History Limits
The max_history parameter controls memory usage:
# Limited history (bounded memory)
uniqseq = UniqSeq(
window_size=10,
max_history=10000 # Track last 10k windows
)
# Unlimited history (for file processing)
uniqseq = UniqSeq(
window_size=10,
max_history=None # No limit
)
Memory usage: - Each window hash: ~100 bytes - 10,000 windows ≈ 1 MB - 100,000 windows ≈ 10 MB
Unique Sequence Limits
The max_unique_sequences parameter limits unique patterns tracked:
uniqseq = UniqSeq(
window_size=10,
max_history=100000,
max_unique_sequences=5000 # Track up to 5k unique patterns
)
When the limit is reached, oldest sequences are evicted (LRU).
Statistics
get_stats()
Returns deduplication statistics:
stats = uniqseq.get_stats()
print(f"Total lines: {stats['total']}")
print(f"Emitted: {stats['emitted']}")
print(f"Skipped: {stats['skipped']}")
print(f"Redundancy: {stats['redundancy_pct']:.1f}%")
print(f"Unique sequences: {stats['unique_sequences']}")
Return value:
{
'total': int, # Total lines processed
'emitted': int, # Lines written to output
'skipped': int, # Lines skipped as duplicates
'redundancy_pct': float, # Percentage of duplicates
'unique_sequences': int # Number of unique patterns found
}
Binary Mode
Process binary data with bytes instead of strings:
uniqseq = UniqSeq(
window_size=10,
delimiter=b'\n' # Use bytes delimiter
)
# Process binary lines
with open('input.bin', 'rb') as f:
for line in f:
line = line.rstrip(b'\n')
uniqseq.process_line(line, sys.stdout.buffer)
uniqseq.flush(sys.stdout.buffer)
Progress Callbacks
Monitor processing progress:
def progress_callback(
line_num: int, lines_skipped: int, seq_count: int
) -> None:
"""Called every 1000 lines."""
redundancy = 100 * lines_skipped / line_num if line_num > 0 else 0
print(f"Processed {line_num:,} lines, {redundancy:.1f}% redundancy",
file=sys.stderr)
uniqseq = UniqSeq(window_size=10)
for line in input_lines:
uniqseq.process_line(line, sys.stdout, progress_callback=progress_callback)
Performance Considerations
Window Size
- Smaller windows (5-10 lines): More sensitive, finds shorter patterns
- Larger windows (20-50 lines): Less sensitive, only finds longer patterns
- Rule of thumb: Set to the minimum pattern length you want to detect
History Size
- Limited history (10k-100k): Fixed memory, may miss old duplicates
- Unlimited history: Grows with unique patterns, best for files
- Auto-detection: CLI auto-enables unlimited for files, limited for streams
Skip Characters
Using skip_chars is more efficient than hash_transform:
# Efficient: skip characters during hashing
uniqseq = UniqSeq(skip_chars=21)
# Less efficient: transform entire line
uniqseq = UniqSeq(
hash_transform=lambda line: line[21:]
)
Concurrent Processing
Each UniqSeq instance maintains internal state for a single stream. For parallel processing, create separate instances per stream:
from concurrent.futures import ThreadPoolExecutor
def process_file(filepath):
# Each worker gets its own UniqSeq instance
uniqseq = UniqSeq(window_size=10)
with open(filepath) as f:
for line in f:
uniqseq.process_line(line.rstrip('\n'))
uniqseq.flush()
return uniqseq.get_stats()
# Process multiple files in parallel
with ThreadPoolExecutor() as executor:
results = executor.map(process_file, file_list)
Note: Do not share a single UniqSeq instance across threads. Each stream requires its own instance to maintain correct state.
See Also
- CLI Reference - Command-line interface
- Library Usage - Higher-level library functions
- Algorithm Details - How the algorithm works