Skip to content

UniqSeq API

API reference for the UniqSeq class - the core deduplication engine.

Overview

The UniqSeq class provides the core deduplication algorithm. It processes lines one at a time in streaming fashion, maintaining bounded memory usage regardless of input size.

Key Features

  • Streaming Processing: Process unlimited input with fixed memory
  • Position-Aware Matching: Tracks where sequences occur for accurate duplicate detection
  • Configurable Window Size: Detect sequences of any length
  • Pattern Filtering: Include or exclude lines based on regex patterns
  • Library Support: Load and save known sequence patterns
  • Inverse Mode: Isolate repeated patterns for analysis
  • Annotation Support: Mark where duplicates were removed

Class Reference

uniqseq.uniqseq.UniqSeq

Streaming line sequence uniqseq with context-aware matching.

Tracks WHERE sequences occur to enable proper duplicate detection.

__init__(window_size=MIN_SEQUENCE_LENGTH, max_history=DEFAULT_MAX_HISTORY, max_unique_sequences=DEFAULT_MAX_UNIQUE_SEQUENCES, max_candidates=DEFAULT_MAX_CANDIDATES, skip_chars=0, hash_transform=None, delimiter='\n', preloaded_sequences=None, save_sequence_callback=None, filter_patterns=None, inverse=False, annotate=False, annotation_format=None, explain=False)

    Initialize uniqseq.

    Args:
        window_size: Minimum sequence length to detect (default: 10)
        max_history: Maximum window hash history (default: 100000), or None for unlimited
        max_unique_sequences: Maximum unique sequences to track (default: 10000),
                            or None for unlimited
        max_candidates: Maximum concurrent candidates to track (default: 100),
                      or None for unlimited. Lower values improve performance but may
                      miss some patterns.
        skip_chars: Number of characters to skip from line start when hashing (default: 0)
        hash_transform: Optional function to transform line before hashing (default: None)
                      Function receives line (str or bytes) and returns transformed line
                      (str or bytes). Must return exactly one line per input
                      (no filtering/splitting)
        delimiter: Delimiter to use when writing output (default: "

") Should be str for text mode, bytes for binary mode preloaded_sequences: Optional set of sequence_content strings/bytes to treat as "already seen". These sequences are skipped on first observation and have unlimited retention (never evicted) save_sequence_callback: Optional callback(file_content) called when a sequence should be saved to library. Receives the raw file content (with delimiters). The callback computes its own hash. filter_patterns: Optional list of FilterPattern objects for sequential pattern matching. Patterns are evaluated in order; first match determines action. "track" = include for deduplication, "bypass" = pass through unchanged. inverse: If True, inverse mode: keep duplicates, remove unique sequences (default: False) annotate: If True, add inline markers showing where duplicates were skipped (default: False) annotation_format: Custom annotation template string. Variables: {start}, {end}, {match_start}, {match_end}, {count}, {window_size} (default: None) explain: If True, output explanations to stderr showing why lines were kept or skipped (default: False)

flush()

Emit remaining buffered lines to output buffer at EOF.

flush_to_stream(output=sys.stdout)

Flush remaining buffered lines to a stream (backward compatibility wrapper).

For new code, prefer using process_lines() iterator which handles flushing automatically.

Parameters:

Name Type Description Default
output Union[TextIO, BinaryIO]

Output stream (default: stdout)

stdout

get_stats()

Get deduplication statistics.

Returns:

Type Description
dict[str, Union[int, float]]

Dictionary with keys: total, emitted, skipped, redundancy_pct, unique_sequences

process_line(line, output=sys.stdout, progress_callback=None)

Process a single line, writing output to a stream (backward compatibility wrapper).

For new code, prefer using process_lines() iterator which is more Pythonic.

Parameters:

Name Type Description Default
line Union[str, bytes]

Line to process (without trailing newline/delimiter, str or bytes)

required
output Union[TextIO, BinaryIO]

Output stream (default: stdout)

stdout
progress_callback Optional[Callable[[int, int, int], None]]

Optional callback(line_num, lines_skipped, seq_count) called every 1000 lines with current statistics

None

process_lines(lines, progress_callback=None)

Process lines through duplicate detection, yielding non-duplicate lines.

This is the preferred Pythonic API for using UniqSeq. It processes an iterable of lines and yields lines that should be output (non-duplicates in normal mode, duplicates in inverse mode).

Parameters:

Name Type Description Default
lines Iterable[Union[str, bytes]]

Iterable of lines to process (without trailing newline/delimiter)

required
progress_callback Optional[Callable[[int, int, int], None]]

Optional callback(line_num, lines_skipped, seq_count) called every 1000 lines with current statistics

None

Yields:

Type Description
Union[str, bytes]

Lines that pass deduplication (str or bytes matching input type)

Example

from uniqseq import UniqSeq deduplicator = UniqSeq(window_size=3) input_lines = ["A", "B", "C", "A", "B", "C"] output = list(deduplicator.process_lines(input_lines)) print(output) ['A', 'B', 'C']

Basic Usage

Simple Deduplication

from uniqseq import UniqSeq
import sys

# Create uniqseq with default settings
uniqseq = UniqSeq(window_size=10)

# Process lines from stdin
for line in sys.stdin:
    line = line.rstrip('\n')  # Remove newline
    uniqseq.process_line(line, sys.stdout)

# Flush remaining buffer
uniqseq.flush(sys.stdout)

# Get statistics
stats = uniqseq.get_stats()
print(
    f"Processed {stats['total']} lines, skipped {stats['skipped']}",
    file=sys.stderr
)

Custom Configuration

from uniqseq import UniqSeq

uniqseq = UniqSeq(
    window_size=5,              # Detect 5-line sequences
    max_history=50000,          # Track up to 50k unique windows
    max_candidates=50,          # Limit concurrent candidates (faster)
    skip_chars=21,              # Skip timestamp prefix
)

# Process file
with open('input.log') as f:
    for line in f:
        line = line.rstrip('\n')
        uniqseq.process_line(line)

uniqseq.flush()

Performance Tuning

from uniqseq import UniqSeq

# Fast mode: good for large files where speed is critical
fast_uniqseq = UniqSeq(
    window_size=10,
    max_candidates=30,          # Fewer candidates = faster
    max_history=50000,          # Smaller history = less memory
)

# Accurate mode: comprehensive analysis
accurate_uniqseq = UniqSeq(
    window_size=10,
    max_candidates=None,        # Unlimited = catches all patterns
    max_history=None,           # Unlimited = complete history
)

# Balanced mode (default): good for most use cases
balanced_uniqseq = UniqSeq(
    window_size=10,
    max_candidates=100,         # Default: balanced performance
    max_history=100000,         # Default: reasonable memory
)

Advanced Features

Pattern Filtering

import re
from uniqseq.uniqseq import UniqSeq, FilterPattern

# Create filter patterns
patterns = [
    FilterPattern(
        pattern=r'^ERROR',
        action='track',
        regex=re.compile(r'^ERROR')
    ),
    FilterPattern(
        pattern=r'^WARN',
        action='bypass',
        regex=re.compile(r'^WARN')
    ),
]

uniqseq = UniqSeq(
    window_size=10,
    filter_patterns=patterns
)

Hash Transformation

def extract_log_message(line: str) -> str:
    """Extract just the message part of a log line."""
    # Skip timestamp and log level, keep only message
    parts = line.split(maxsplit=3)
    return parts[3] if len(parts) > 3 else line

uniqseq = UniqSeq(
    window_size=10,
    hash_transform=extract_log_message
)

Library Mode

from pathlib import Path

def save_callback(seq_hash: str, seq_lines: list[str]) -> None:
    """Save discovered sequences to disk."""
    output_dir = Path('~/sequences').expanduser()
    output_dir.mkdir(exist_ok=True)

    filepath = output_dir / f'{seq_hash}.txt'
    filepath.write_text('\n'.join(seq_lines))

uniqseq = UniqSeq(
    window_size=10,
    save_sequence_callback=save_callback
)

Preloaded Sequences

# Load known patterns to skip on first occurrence
preloaded = {
    'abc123def456': 'Line 1\nLine 2\nLine 3',
    '789ghi012jkl': 'Error A\nError B\nError C',
}

uniqseq = UniqSeq(
    window_size=3,
    preloaded_sequences=preloaded
)

Inverse Mode

# Keep only duplicates (for pattern analysis)
uniqseq = UniqSeq(
    window_size=10,
    inverse=True
)

# Process lines - will output only repeated sequences
for line in input_lines:
    uniqseq.process_line(line, sys.stdout)

uniqseq.flush(sys.stdout)

Annotations

# Add markers where duplicates were skipped
uniqseq = UniqSeq(
    window_size=10,
    annotate=True,
    annotation_format='[SKIP: Lines {start}-{end}, seen {count}x]'
)

Memory Management

History Limits

The max_history parameter controls memory usage:

# Limited history (bounded memory)
uniqseq = UniqSeq(
    window_size=10,
    max_history=10000  # Track last 10k windows
)

# Unlimited history (for file processing)
uniqseq = UniqSeq(
    window_size=10,
    max_history=None  # No limit
)

Memory usage: - Each window hash: ~100 bytes - 10,000 windows ≈ 1 MB - 100,000 windows ≈ 10 MB

Unique Sequence Limits

The max_unique_sequences parameter limits unique patterns tracked:

uniqseq = UniqSeq(
    window_size=10,
    max_history=100000,
    max_unique_sequences=5000  # Track up to 5k unique patterns
)

When the limit is reached, oldest sequences are evicted (LRU).

Statistics

get_stats()

Returns deduplication statistics:

stats = uniqseq.get_stats()

print(f"Total lines: {stats['total']}")
print(f"Emitted: {stats['emitted']}")
print(f"Skipped: {stats['skipped']}")
print(f"Redundancy: {stats['redundancy_pct']:.1f}%")
print(f"Unique sequences: {stats['unique_sequences']}")

Return value:

{
    'total': int,            # Total lines processed
    'emitted': int,          # Lines written to output
    'skipped': int,          # Lines skipped as duplicates
    'redundancy_pct': float, # Percentage of duplicates
    'unique_sequences': int  # Number of unique patterns found
}

Binary Mode

Process binary data with bytes instead of strings:

uniqseq = UniqSeq(
    window_size=10,
    delimiter=b'\n'  # Use bytes delimiter
)

# Process binary lines
with open('input.bin', 'rb') as f:
    for line in f:
        line = line.rstrip(b'\n')
        uniqseq.process_line(line, sys.stdout.buffer)

uniqseq.flush(sys.stdout.buffer)

Progress Callbacks

Monitor processing progress:

def progress_callback(
    line_num: int, lines_skipped: int, seq_count: int
) -> None:
    """Called every 1000 lines."""
    redundancy = 100 * lines_skipped / line_num if line_num > 0 else 0
    print(f"Processed {line_num:,} lines, {redundancy:.1f}% redundancy",
          file=sys.stderr)

uniqseq = UniqSeq(window_size=10)

for line in input_lines:
    uniqseq.process_line(line, sys.stdout, progress_callback=progress_callback)

Performance Considerations

Window Size

  • Smaller windows (5-10 lines): More sensitive, finds shorter patterns
  • Larger windows (20-50 lines): Less sensitive, only finds longer patterns
  • Rule of thumb: Set to the minimum pattern length you want to detect

History Size

  • Limited history (10k-100k): Fixed memory, may miss old duplicates
  • Unlimited history: Grows with unique patterns, best for files
  • Auto-detection: CLI auto-enables unlimited for files, limited for streams

Skip Characters

Using skip_chars is more efficient than hash_transform:

# Efficient: skip characters during hashing
uniqseq = UniqSeq(skip_chars=21)

# Less efficient: transform entire line
uniqseq = UniqSeq(
    hash_transform=lambda line: line[21:]
)

Concurrent Processing

Each UniqSeq instance maintains internal state for a single stream. For parallel processing, create separate instances per stream:

from concurrent.futures import ThreadPoolExecutor

def process_file(filepath):
    # Each worker gets its own UniqSeq instance
    uniqseq = UniqSeq(window_size=10)
    with open(filepath) as f:
        for line in f:
            uniqseq.process_line(line.rstrip('\n'))
    uniqseq.flush()
    return uniqseq.get_stats()

# Process multiple files in parallel
with ThreadPoolExecutor() as executor:
    results = executor.map(process_file, file_list)

Note: Do not share a single UniqSeq instance across threads. Each stream requires its own instance to maintain correct state.

See Also