#!/usr/bin/env python3
"""
Command Line Interface for XRayLabTool.
This module provides a comprehensive CLI for calculating X-ray optical properties
of materials, including single material calculations, batch processing, utility
functions for X-ray analysis, and shell completion installation.
Available Commands:
calc Calculate X-ray properties for a single material
batch Process multiple materials from CSV file
compare Compare X-ray properties between multiple materials
convert Convert between energy and wavelength units
formula Parse and analyze chemical formulas
atomic Look up atomic data for elements
bragg Calculate Bragg angles for diffraction
list List available data and information
install-completion Install shell completion for xraylabtool
uninstall-completion Remove shell completion for xraylabtool
The CLI supports various output formats (table, CSV, JSON), field filtering,
precision control, and comprehensive shell completion for enhanced usability.
"""
# ruff: noqa: I001
import argparse
import json
import sys
from pathlib import Path
from textwrap import dedent
from typing import Any
import numpy as np
# Essential imports only - heavy modules imported lazily in functions
# pandas import moved to function level to reduce startup time
from xraylabtool import __version__
from xraylabtool.logging_utils import configure_logging, get_logger, log_environment
# These basic utilities are lightweight and used frequently
from xraylabtool.utils import (
bragg_angle,
energy_to_wavelength,
get_atomic_number,
get_atomic_weight,
parse_formula,
wavelength_to_energy,
)
# Heavy imports moved to lazy loading:
# - numpy, pandas: imported when needed for data processing
# - analysis modules: imported in cmd_compare function
# Stub implementations for removed monitoring/performance modules
[docs]
class MemoryMonitor:
[docs]
def __init__(self) -> None:
pass
[docs]
def update(self) -> None:
pass
[docs]
def print_summary(self) -> None:
pass
[docs]
class AdaptiveChunkSizer:
[docs]
def __init__(self) -> None:
pass
[docs]
def calculate_chunk_size(self, total: int) -> int:
return total
[docs]
def create_batch_progress_tracker(**kwargs: Any) -> Any:
from contextlib import nullcontext
return nullcontext()
# - progress modules: imported in cmd_batch function
# - validation modules: imported when needed
[docs]
def create_parser() -> argparse.ArgumentParser:
"""Create the main argument parser with all subcommands."""
parser = argparse.ArgumentParser(
prog="xraylabtool",
description="X-ray optical properties calculator for materials science",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Calculate properties for SiO2 at 10 keV
xraylabtool calc SiO2 -e 10.0 -d 2.2
# Energy sweep for silicon
xraylabtool calc Si -e 5.0,10.0,15.0,20.0 -d 2.33 -o silicon_sweep.csv
# Batch calculation from CSV file
xraylabtool batch materials.csv -o results.csv
# Convert energy to wavelength
xraylabtool convert energy 10.0 --to wavelength
# Parse chemical formula
xraylabtool formula SiO2 --verbose
# Install shell completion
xraylabtool install-completion
For more detailed help on specific commands, use:
xraylabtool <command> --help
"""),
)
parser.add_argument(
"--version", action="version", version=f"XRayLabTool {__version__}"
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose output"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode for detailed error information",
)
# Add completion installation flags
completion_group = parser.add_argument_group("completion installation")
completion_group.add_argument(
"--install-completion",
nargs="?",
const="auto",
choices=["auto", "bash", "zsh", "fish", "powershell"],
metavar="SHELL",
help=(
"Install shell completion for specified shell "
"(auto-detects if not specified)"
),
)
completion_group.add_argument(
"--test",
action="store_true",
help="Test completion installation (use with --install-completion)",
)
completion_group.add_argument(
"--system",
action="store_true",
help="Install system-wide completion (use with --install-completion)",
)
completion_group.add_argument(
"--uninstall",
action="store_true",
help="Uninstall completion (use with --install-completion)",
)
# Create subparsers for different commands
subparsers = parser.add_subparsers(
dest="command", help="Available commands", metavar="COMMAND"
)
# Add subcommands
add_calc_command(subparsers)
add_batch_command(subparsers)
add_compare_command(subparsers)
add_convert_command(subparsers)
add_formula_command(subparsers)
add_atomic_command(subparsers)
add_bragg_command(subparsers)
add_list_command(subparsers)
add_completion_command(subparsers)
add_install_completion_command(subparsers)
add_uninstall_completion_command(subparsers)
return parser
[docs]
def add_calc_command(subparsers: Any) -> None:
"""Add the 'calc' subcommand for single material calculations."""
parser = subparsers.add_parser(
"calc",
help="Calculate X-ray properties for a single material",
description=(
"Calculate X-ray optical properties for a single material composition"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Single energy calculation
xraylabtool calc SiO2 -e 10.0 -d 2.2
# Multiple energies (comma-separated)
xraylabtool calc Si -e 5.0,10.0,15.0,20.0 -d 2.33
# Energy range with linear spacing
xraylabtool calc Al2O3 -e 5-15:11 -d 3.95
# Energy range with log spacing
xraylabtool calc C -e 1-30:100:log -d 3.52
# Save results to file
xraylabtool calc SiO2 -e 8.0,10.0,12.0 -d 2.2 -o results.csv
# JSON output format
xraylabtool calc Si -e 10.0 -d 2.33 -o results.json --format json
"""),
)
parser.add_argument("formula", help="Chemical formula (e.g., SiO2, Al2O3, Fe2O3)")
parser.add_argument(
"-e",
"--energy",
required=True,
help=dedent("""
X-ray energy in keV. Formats:
- Single value: 10.0
- Comma-separated: 5.0,10.0,15.0
- Range with count: 5-15:11 (11 points from 5 to 15 keV)
- Log range: 1-30:100:log (100 log-spaced points)
""").strip(),
)
parser.add_argument(
"-d", "--density", type=float, required=True, help="Material density in g/cm³"
)
parser.add_argument(
"-o", "--output", help="Output filename (CSV or JSON based on extension)"
)
parser.add_argument(
"--format",
choices=["table", "csv", "json"],
default="table",
help="Output format (default: table)",
)
parser.add_argument(
"--fields", help="Comma-separated list of fields to output (default: all)"
)
parser.add_argument(
"--precision",
type=int,
default=6,
help="Number of decimal places for output (default: 6)",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode for detailed error information",
)
[docs]
def add_batch_command(subparsers: Any) -> None:
"""Add the 'batch' subcommand for processing multiple materials."""
parser = subparsers.add_parser(
"batch",
help="Process multiple materials from CSV file",
description="Calculate X-ray properties for multiple materials from CSV input",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Input CSV format:
The input CSV file should have columns: formula, density, energy
Example CSV content:
formula,density,energy
SiO2,2.2,10.0
Al2O3,3.95,"5.0,10.0,15.0"
Si,2.33,8.0
Examples:
# Process materials from CSV
xraylabtool batch materials.csv -o results.csv
# Specific output format
xraylabtool batch materials.csv -o results.json --format json
# Parallel processing with 4 workers
xraylabtool batch materials.csv -o results.csv --workers 4
"""),
)
parser.add_argument("input_file", help="Input CSV file with materials data")
parser.add_argument(
"-o",
"--output",
required=True,
help="Output filename (CSV or JSON based on extension)",
)
parser.add_argument(
"--format",
choices=["csv", "json"],
help="Output format (auto-detected from extension if not specified)",
)
parser.add_argument(
"--workers", type=int, help="Number of parallel workers (default: auto)"
)
parser.add_argument(
"--fields", help="Comma-separated list of fields to include in output"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode for detailed error information",
)
parser.add_argument(
"--progress",
action="store_true",
help="Show progress bar during batch processing",
)
parser.add_argument(
"--no-progress",
action="store_true",
help="Disable progress bar (overrides --progress)",
)
[docs]
def add_convert_command(subparsers: Any) -> None:
"""Add the 'convert' subcommand for unit conversions."""
parser = subparsers.add_parser(
"convert",
help="Convert between energy and wavelength units",
description="Convert between X-ray energy and wavelength units",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Convert energy to wavelength
xraylabtool convert energy 10.0 --to wavelength
# Convert wavelength to energy
xraylabtool convert wavelength 1.24 --to energy
# Multiple values
xraylabtool convert energy 5.0,10.0,15.0 --to wavelength
# Save to file
xraylabtool convert energy 5.0,10.0,15.0 --to wavelength -o conversions.csv
"""),
)
parser.add_argument(
"from_unit", choices=["energy", "wavelength"], help="Input unit type"
)
parser.add_argument(
"values", help="Value(s) to convert (comma-separated for multiple)"
)
parser.add_argument(
"--to",
dest="to_unit",
choices=["energy", "wavelength"],
required=True,
help="Output unit type",
)
parser.add_argument("-o", "--output", help="Output filename (CSV format)")
[docs]
def add_atomic_command(subparsers: Any) -> None:
"""Add the 'atomic' subcommand for atomic data lookup."""
parser = subparsers.add_parser(
"atomic",
help="Look up atomic data for elements",
description="Look up atomic numbers, weights, and other properties",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Single element
xraylabtool atomic Si
# Multiple elements
xraylabtool atomic H,C,N,O,Si
# Save to file
xraylabtool atomic Si,Al,Fe -o atomic_data.csv
"""),
)
parser.add_argument(
"elements", help="Element symbol(s) (comma-separated for multiple)"
)
parser.add_argument(
"-o", "--output", help="Output filename (CSV or JSON based on extension)"
)
[docs]
def add_bragg_command(subparsers: Any) -> None:
"""Add the 'bragg' subcommand for Bragg angle calculations."""
parser = subparsers.add_parser(
"bragg",
help="Calculate Bragg angles for diffraction",
description="Calculate Bragg diffraction angles",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Single calculation
xraylabtool bragg -d 3.14 -w 1.54 --order 1
# Multiple d-spacings
xraylabtool bragg -d 3.14,2.45,1.92 -w 1.54
# Energy instead of wavelength
xraylabtool bragg -d 3.14 -e 8.0
"""),
)
parser.add_argument(
"-d",
"--dspacing",
required=True,
help="d-spacing in Angstroms (comma-separated for multiple)",
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-w", "--wavelength", help="X-ray wavelength in Angstroms")
group.add_argument("-e", "--energy", help="X-ray energy in keV")
parser.add_argument(
"--order", type=int, default=1, help="Diffraction order (default: 1)"
)
parser.add_argument("-o", "--output", help="Output filename (CSV format)")
[docs]
def add_list_command(subparsers: Any) -> None:
"""Add the 'list' subcommand for listing available data."""
parser = subparsers.add_parser(
"list",
help="List available data and information",
description="List available elements, constants, or other information",
)
parser.add_argument(
"type",
choices=["constants", "fields", "examples"],
help="Type of information to list",
)
[docs]
def add_install_completion_command(subparsers: Any) -> None:
"""Add the 'install-completion' subcommand for shell completion setup."""
parser = subparsers.add_parser(
"install-completion",
help="Install shell completion for xraylabtool",
description="Install shell completion for xraylabtool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Install completion for current shell (auto-detected)
xraylabtool install-completion
# Install for specific shell
xraylabtool install-completion bash
xraylabtool install-completion zsh
xraylabtool install-completion fish
# Install completion system-wide (requires sudo)
xraylabtool install-completion --system
# Test if completion is working
xraylabtool install-completion --test
# Uninstall completion
xraylabtool install-completion --uninstall
"""),
)
# Positional argument for shell type
parser.add_argument(
"shell",
nargs="?",
choices=["bash", "zsh", "fish", "powershell"],
default=None,
help="Shell type to install completion for (auto-detected if not specified)",
)
parser.add_argument(
"--user",
action="store_true",
default=True,
help="Install for current user only (default)",
)
parser.add_argument(
"--system",
action="store_true",
help="Install system-wide (requires sudo privileges)",
)
parser.add_argument(
"--test",
action="store_true",
help="Test if completion is working",
)
parser.add_argument(
"--uninstall",
action="store_true",
help="Uninstall existing completion",
)
[docs]
def add_completion_command(subparsers: Any) -> None:
"""Add the 'completion' subcommand for the new completion system."""
parser = subparsers.add_parser(
"completion",
help="Manage virtual environment-centric shell completion",
description=(
"Manage shell completion that activates/deactivates with virtual"
" environments"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Install completion in current virtual environment
xraylabtool completion install
# Install for specific shell
xraylabtool completion install --shell zsh
# List all environments with completion status
xraylabtool completion list
# Show completion status for current environment
xraylabtool completion status
# Uninstall from current environment
xraylabtool completion uninstall
# Uninstall from all environments
xraylabtool completion uninstall --all
# Show system information
xraylabtool completion info
The new completion system:
• Installs per virtual environment (no system-wide changes)
• Automatically activates/deactivates with environment
• Supports venv, conda, Poetry, Pipenv environments
• Provides native completion for multiple shells
"""),
)
# Create subparsers for completion actions
completion_subparsers = parser.add_subparsers(
dest="completion_action", help="Available completion actions", metavar="ACTION"
)
# Install subcommand
install_parser = completion_subparsers.add_parser(
"install",
help="Install completion in virtual environment",
)
install_parser.add_argument(
"--shell",
"-s",
choices=["bash", "zsh", "fish", "powershell"],
help="Shell type (auto-detected if not specified)",
)
install_parser.add_argument(
"--env",
"-e",
help="Target environment name (current environment if not specified)",
)
install_parser.add_argument(
"--force",
"-f",
action="store_true",
help="Force reinstallation if already installed",
)
# Uninstall subcommand
uninstall_parser = completion_subparsers.add_parser(
"uninstall",
help="Remove completion from environment(s)",
)
uninstall_parser.add_argument(
"--env",
"-e",
help="Target environment name (current environment if not specified)",
)
uninstall_parser.add_argument(
"--all",
action="store_true",
help="Remove from all environments",
)
# List subcommand
completion_subparsers.add_parser(
"list",
help="List environments with completion status",
)
# Status subcommand
completion_subparsers.add_parser(
"status",
help="Show completion status for current environment",
)
# Info subcommand
completion_subparsers.add_parser(
"info",
help="Show information about the completion system",
)
[docs]
def add_uninstall_completion_command(subparsers: Any) -> None:
"""Add the 'uninstall-completion' subcommand for shell completion removal."""
parser = subparsers.add_parser(
"uninstall-completion",
help="Uninstall shell completion for xraylabtool",
description="Remove shell completion functionality",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Uninstall completion for current shell (auto-detected)
xraylabtool uninstall-completion
# Uninstall for specific shell
xraylabtool uninstall-completion bash
xraylabtool uninstall-completion zsh
xraylabtool uninstall-completion fish
# Uninstall system-wide completion (requires sudo)
xraylabtool uninstall-completion --system
# Clean up active session
xraylabtool uninstall-completion --cleanup
"""),
)
parser.add_argument(
"shell_type",
nargs="?",
choices=["bash", "zsh", "fish", "powershell"],
help="Shell type to remove completion from (auto-detected if not specified)",
)
parser.add_argument(
"--user",
action="store_true",
default=True,
help="Remove from current user only (default)",
)
parser.add_argument(
"--system",
action="store_true",
help="Remove system-wide completion (requires sudo privileges)",
)
parser.add_argument(
"--cleanup",
action="store_true",
help="Clean up active shell session",
)
[docs]
def add_compare_command(subparsers: Any) -> None:
"""Add the 'compare' subcommand for material comparison."""
parser = subparsers.add_parser(
"compare",
help="Compare X-ray properties between multiple materials",
description=(
"Compare X-ray optical properties across multiple materials with"
" side-by-side analysis"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=dedent("""
Examples:
# Compare two materials at single energy
xraylabtool compare SiO2,2.2 Al2O3,3.95 -e 10.0
# Compare materials across energy range
xraylabtool compare Si,2.33 Ge,5.32 -e 5-15:11
# Compare specific properties
xraylabtool compare SiO2,2.2 Si3N4,3.2 -e 8.0,10.0,12.0 --properties dispersion_delta,absorption_beta
# Save comparison to file
xraylabtool compare SiO2,2.2 Al2O3,3.95 -e 10.0 -o comparison.csv
# Generate detailed report
xraylabtool compare Si,2.33 GaAs,5.32 -e 10.0 --report --output comparison_report.txt
"""),
)
parser.add_argument(
"materials",
nargs="+",
help="Materials in format 'formula,density' (e.g., SiO2,2.2 Al2O3,3.95)",
)
parser.add_argument(
"-e",
"--energy",
required=True,
help="X-ray energy in keV (single value, comma-separated, or range format)",
)
parser.add_argument(
"--properties",
help=(
"Comma-separated list of properties to compare (default: all standard"
" properties)"
),
)
parser.add_argument("-o", "--output", help="Output filename for comparison results")
parser.add_argument(
"--format",
choices=["table", "csv", "json"],
default="table",
help="Output format (default: table)",
)
parser.add_argument(
"--report", action="store_true", help="Generate detailed comparison report"
)
parser.add_argument(
"--precision",
type=int,
default=6,
help="Number of decimal places for output (default: 6)",
)
[docs]
def parse_energy_string(energy_str: str) -> np.ndarray:
"""Parse energy string and return numpy array."""
import numpy as np
if "," in energy_str:
# Comma-separated values
return np.array([float(x.strip()) for x in energy_str.split(",")])
elif "-" in energy_str and ":" in energy_str:
# Range format: start-end:count or start-end:count:spacing
parts = energy_str.split(":")
range_part = parts[0]
count = int(parts[1])
spacing = parts[2] if len(parts) > 2 else "linear"
start, end = map(float, range_part.split("-"))
if spacing.lower() == "log":
return np.logspace(np.log10(start), np.log10(end), count)
else:
return np.linspace(start, end, count)
else:
# Single value
return np.array([float(energy_str)])
def _get_default_fields() -> tuple[list[str], list[str]]:
"""Get default scalar and array fields."""
array_fields = [
"energy_kev",
"wavelength_angstrom",
"dispersion_delta",
"absorption_beta",
"scattering_factor_f1",
"scattering_factor_f2",
"critical_angle_degrees",
"attenuation_length_cm",
"real_sld_per_ang2",
"imaginary_sld_per_ang2",
]
scalar_fields = [
"formula",
"molecular_weight_g_mol",
"total_electrons",
"density_g_cm3",
"electron_density_per_ang3",
]
return scalar_fields, array_fields
def _format_as_json(result: Any, fields: list[str]) -> str:
"""Format result as JSON."""
import numpy as np
data = {}
for field in fields:
value = getattr(result, field)
if isinstance(value, np.ndarray):
data[field] = value.tolist()
else:
data[field] = value
return json.dumps(data, indent=2)
def _format_as_csv(result: Any, fields: list[str], precision: int) -> str:
"""Format result as CSV."""
import csv
import io
import numpy as np
n_energies = len(result.energy_kev)
# Vectorized approach: separate array and scalar fields for efficiency
array_fields = [f for f in fields if isinstance(getattr(result, f), np.ndarray)]
scalar_fields = [
f for f in fields if not isinstance(getattr(result, f), np.ndarray)
]
# Vectorize array operations
data_arrays = {
field: np.round(getattr(result, field), precision) for field in array_fields
}
scalar_data = {field: getattr(result, field) for field in scalar_fields}
# Create rows efficiently using vectorized data
data_rows = [
{
**scalar_data,
**{field: float(data_arrays[field][i]) for field in array_fields},
}
for i in range(n_energies)
]
if data_rows:
# Use CSV module instead of pandas
output = io.StringIO()
fieldnames = fields
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data_rows)
return output.getvalue()
return ""
def _format_material_properties(result: Any, precision: int) -> list[str]:
"""Format material properties section."""
return [
"Material Properties:",
f" Formula: {result.formula}",
f" Molecular Weight: {result.molecular_weight_g_mol: .{precision}f} g/mol",
f" Total Electrons: {result.total_electrons: .{precision}f}",
f" Density: {result.density_g_cm3: .{precision}f} g/cm³",
(
f" Electron Density: {result.electron_density_per_ang3: .{precision}e} "
"electrons/ų"
),
"",
]
def _format_single_energy(result: Any, precision: int) -> list[str]:
"""Format single energy point properties."""
return [
"X-ray Properties:",
f" Energy: {result.energy_kev[0]:.{precision}f} keV",
f" Wavelength: {result.wavelength_angstrom[0]:.{precision}f} Å",
f" Dispersion (δ): {result.dispersion_delta[0]:.{precision}e}",
f" Absorption (β): {result.absorption_beta[0]:.{precision}e}",
f" Scattering f1: {result.scattering_factor_f1[0]:.{precision}f}",
f" Scattering f2: {result.scattering_factor_f2[0]:.{precision}f}",
f" Critical Angle: {result.critical_angle_degrees[0]:.{precision}f}°",
f" Attenuation Length: {result.attenuation_length_cm[0]:.{precision}f} cm",
f" Real SLD: {result.real_sld_per_ang2[0]:.{precision}e} Å⁻²",
f" Imaginary SLD: {result.imaginary_sld_per_ang2[0]:.{precision}e} Å⁻²",
]
def _format_multiple_energies(result: Any, precision: int) -> list[str]:
"""Format multiple energy points as table."""
import numpy as np
output_lines = ["X-ray Properties (tabular):"]
# Create table without pandas
headers = ["Energy (keV)", "λ (Å)", "δ", "β", "f1", "f2", "θc (°)", "μ (cm)"]
data_arrays = [
result.energy_kev,
result.wavelength_angstrom,
result.dispersion_delta,
result.absorption_beta,
result.scattering_factor_f1,
result.scattering_factor_f2,
result.critical_angle_degrees,
result.attenuation_length_cm,
]
# Calculate column widths
col_widths = [max(len(header), 12) for header in headers]
# Format header
header_line = " ".join(
header.ljust(width) for header, width in zip(headers, col_widths, strict=False)
)
output_lines.append(header_line)
# Format data rows
n_energies = len(result.energy_kev)
for i in range(n_energies):
row_values = []
for data_array in data_arrays:
value = data_array[i] if isinstance(data_array, np.ndarray) else data_array
row_values.append(f"{value:.{precision}g}")
row_line = " ".join(
val.ljust(width) for val, width in zip(row_values, col_widths, strict=False)
)
output_lines.append(row_line)
return output_lines
def _format_scalar_field(field: str, value: Any, precision: int) -> str:
"""Format a single scalar field."""
from collections.abc import Callable
def default_formatter(v: Any, p: int) -> str:
return ""
formatters: dict[str, Callable[[Any, int], str]] = {
"formula": lambda v, _: f" Formula: {v}",
"molecular_weight_g_mol": lambda v, p: f" Molecular Weight: {v: .{p}f} g/mol",
"total_electrons": lambda v, p: f" Total Electrons: {v: .{p}f}",
"density_g_cm3": lambda v, p: f" Density: {v: .{p}f} g/cm³",
"electron_density_per_ang3": lambda v, p: (
f" Electron Density: {v: .{p}e} electrons/ų"
),
}
formatter = formatters.get(field, default_formatter)
return formatter(value, precision)
def _format_array_field_single(field: str, value: float, precision: int) -> str:
"""Format a single array field for single energy point."""
formatters: dict[str, tuple[str, str, str]] = {
"energy_kev": (" Energy:", "f", " keV"),
"wavelength_angstrom": (" Wavelength:", "f", " Å"),
"dispersion_delta": (" Dispersion (δ):", "e", ""),
"absorption_beta": (" Absorption (β):", "e", ""),
"scattering_factor_f1": (" Scattering f1:", "f", ""),
"scattering_factor_f2": (" Scattering f2:", "f", ""),
"critical_angle_degrees": (" Critical Angle:", "f", "°"),
"attenuation_length_cm": (" Attenuation Length:", "f", " cm"),
"real_sld_per_ang2": (" Real SLD:", "e", " Å⁻²"),
"imaginary_sld_per_ang2": (" Imaginary SLD:", "e", " Å⁻²"),
}
if field in formatters:
label, fmt, suffix = formatters[field]
return f"{label} {value: .{precision}{fmt}}{suffix}"
return ""
def _get_field_labels() -> dict[str, str]:
"""Get mapping of field names to display labels."""
return {
"energy_kev": "Energy (keV)",
"wavelength_angstrom": "λ (Å)",
"dispersion_delta": "δ",
"absorption_beta": "β",
"scattering_factor_f1": "f1",
"scattering_factor_f2": "f2",
"critical_angle_degrees": "θc (°)",
"attenuation_length_cm": "μ (cm)",
"real_sld_per_ang2": "Real SLD",
"imaginary_sld_per_ang2": "Imag SLD",
}
def _format_scalar_fields_section(
result: Any, fields_to_show: list[str], precision: int
) -> list[str]:
"""Format scalar fields section."""
if not fields_to_show:
return []
output_lines = ["Material Properties:"]
for field in fields_to_show:
value = getattr(result, field)
line = _format_scalar_field(field, value, precision)
if line:
output_lines.append(line)
output_lines.append("")
return output_lines
def _format_single_energy_section(
result: Any, fields_to_show: list[str], precision: int
) -> list[str]:
"""Format single energy point array fields."""
if not fields_to_show:
return []
output_lines = ["X-ray Properties:"]
for field in fields_to_show:
value = getattr(result, field)[0]
line = _format_array_field_single(field, value, precision)
if line:
output_lines.append(line)
return output_lines
def _format_multiple_energy_section(
result: Any, fields_to_show: list[str], precision: int
) -> list[str]:
"""Format multiple energy points as tabular data."""
import numpy as np
if not fields_to_show:
return []
output_lines = ["X-ray Properties (tabular):"]
field_labels = _get_field_labels()
# Collect headers and data arrays
headers = []
data_arrays = []
for field in fields_to_show:
label = field_labels.get(field, field)
headers.append(label)
data_arrays.append(getattr(result, field))
if headers:
# Calculate column widths
col_widths = [max(len(header), 12) for header in headers]
# Format header
header_line = " ".join(
header.ljust(width)
for header, width in zip(headers, col_widths, strict=False)
)
output_lines.append(header_line)
# Format data rows
n_rows = len(data_arrays[0]) if data_arrays else 0
for i in range(n_rows):
row_values = []
for data_array in data_arrays:
if isinstance(data_array, np.ndarray):
value = data_array[i]
else:
value = data_array
row_values.append(f"{value:.{precision}g}")
row_line = " ".join(
val.ljust(width)
for val, width in zip(row_values, col_widths, strict=False)
)
output_lines.append(row_line)
return output_lines
def _format_filtered_table(result: Any, fields: list[str], precision: int) -> str:
"""Format table with only specified fields."""
# Separate scalar and array fields
scalar_fields, array_fields = _get_default_fields()
scalar_fields_to_show = [f for f in fields if f in scalar_fields]
array_fields_to_show = [f for f in fields if f in array_fields]
output_lines = []
# Add scalar fields section
output_lines.extend(
_format_scalar_fields_section(result, scalar_fields_to_show, precision)
)
# Add array fields section
if array_fields_to_show:
if len(result.energy_kev) == 1:
output_lines.extend(
_format_single_energy_section(result, array_fields_to_show, precision)
)
else:
output_lines.extend(
_format_multiple_energy_section(result, array_fields_to_show, precision)
)
return "\n".join(output_lines)
def _validate_calc_inputs(args: Any, energies: Any) -> bool:
"""Validate calculation inputs."""
import numpy as np
if args.density <= 0:
print("Error: Density must be positive", file=sys.stderr)
return False
if np.any(energies <= 0):
print("Error: All energies must be positive", file=sys.stderr)
return False
if np.any(energies < 0.03) or np.any(energies > 30):
print("Warning: Energy values outside typical X-ray range (0.03-30 keV)")
return True
def _print_calc_verbose_info(args: Any, energies: Any) -> None:
"""Print verbose calculation information."""
print(f"Calculating X-ray properties for {args.formula}...")
print(
f"Energy range: {energies.min(): .3f} - {energies.max(): .3f} keV "
f"({len(energies)} points)"
)
print(f"Density: {args.density} g/cm³")
print()
def _determine_output_format(args: Any) -> str:
"""Determine output format based on args and file extension."""
output_format: str = args.format
if args.output:
output_path = Path(args.output)
if not output_format or output_format == "table":
if output_path.suffix.lower() == ".json":
output_format = "json"
elif output_path.suffix.lower() == ".csv":
output_format = "csv"
return output_format
def _save_or_print_output(formatted_output: str, args: Any) -> None:
"""Save output to file or print to stdout."""
if args.output:
Path(args.output).write_text(formatted_output)
if args.verbose:
print(f"Results saved to {args.output}")
else:
print(formatted_output)
[docs]
def cmd_calc(args: Any) -> int:
"""Handle the 'calc' command."""
try:
# Lazy imports for this command
from xraylabtool.calculators.core import calculate_single_material_properties
from xraylabtool.validation import validate_chemical_formula, validate_density
# Basic validation
try:
validate_chemical_formula(args.formula)
except Exception as e:
print(
f"Error: Invalid chemical formula '{args.formula}': {e}",
file=sys.stderr,
)
return 1
try:
validate_density(args.density)
except Exception as e:
print(f"Error: Invalid density '{args.density}': {e}", file=sys.stderr)
return 1
energies = parse_energy_string(args.energy)
if not _validate_calc_inputs(args, energies):
return 1
if args.verbose:
_print_calc_verbose_info(args, energies)
result = calculate_single_material_properties(
args.formula, energies, args.density
)
fields = None
if args.fields:
fields = [field.strip() for field in args.fields.split(",")]
output_format = _determine_output_format(args)
formatted_output = format_xray_result(
result, output_format, args.precision, fields
)
_save_or_print_output(formatted_output, args)
return 0
except Exception as e:
debug_mode = getattr(args, "debug", False)
if debug_mode:
import traceback
print("🔍 Debug: Full traceback:", file=sys.stderr)
traceback.print_exc()
print(f"Error: {e}", file=sys.stderr)
return 1
def _validate_batch_input(args: Any) -> list[Any] | None:
"""Validate batch input file and return data."""
import csv
input_path = Path(args.input_file)
if not input_path.exists():
print(f"Error: Input file {args.input_file} not found", file=sys.stderr)
return None
try:
# Read CSV using standard library
with open(input_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
data_rows = list(reader)
if not data_rows:
print("Error: Input file is empty", file=sys.stderr)
return None
# Check for required columns
required_columns = ["formula", "density", "energy"]
actual_columns = set(data_rows[0].keys()) if data_rows else set()
missing_columns = [col for col in required_columns if col not in actual_columns]
if missing_columns:
print(
f"Error: Missing required columns: {missing_columns}", file=sys.stderr
)
return None
return data_rows
except Exception as e:
print(f"Error reading input file: {e}", file=sys.stderr)
return None
def _parse_batch_data(
data_input: list[Any],
) -> tuple[list[str] | None, list[float] | None, list[list[float]] | None]:
"""Parse batch data from list of dictionaries."""
formulas = []
densities = []
energy_sets = []
for row in data_input:
formulas.append(row["formula"])
densities.append(float(row["density"]))
energy_str = str(row["energy"])
try:
if "," in energy_str:
energies = [float(x.strip()) for x in energy_str.split(",")]
else:
energies = [float(energy_str)]
energy_sets.append(energies)
except ValueError:
print(
f"Error: Invalid energy format for {row['formula']}: {energy_str}",
file=sys.stderr,
)
return None, None, None
return formulas, densities, energy_sets
def _convert_result_to_dict(result: Any, energy_index: int) -> dict[str, Any]:
"""Convert XRayResult to dictionary for specific energy point."""
return {
"formula": result.formula,
"density_g_cm3": result.density_g_cm3,
"energy_kev": result.energy_kev[energy_index],
"wavelength_angstrom": result.wavelength_angstrom[energy_index],
"molecular_weight_g_mol": result.molecular_weight_g_mol,
"total_electrons": result.total_electrons,
"electron_density_per_ang3": result.electron_density_per_ang3,
"dispersion_delta": result.dispersion_delta[energy_index],
"absorption_beta": result.absorption_beta[energy_index],
"scattering_factor_f1": result.scattering_factor_f1[energy_index],
"scattering_factor_f2": result.scattering_factor_f2[energy_index],
"critical_angle_degrees": result.critical_angle_degrees[energy_index],
"attenuation_length_cm": result.attenuation_length_cm[energy_index],
"real_sld_per_ang2": result.real_sld_per_ang2[energy_index],
"imaginary_sld_per_ang2": result.imaginary_sld_per_ang2[energy_index],
}
def _process_batch_materials(
formulas: list[str],
densities: list[float],
energy_sets: list[list[float]],
args: Any,
) -> list[dict[str, Any]]:
"""Process all materials and return results with progress tracking."""
# Import required calculation function
from xraylabtool.calculators.core import calculate_single_material_properties
results = []
# Initialize progress tracking and performance monitoring
enable_progress = getattr(args, "progress", False) and not getattr(
args, "no_progress", False
)
# Auto-enable progress for large batches unless explicitly disabled
if len(formulas) > 10 and not getattr(args, "no_progress", False):
enable_progress = True
# Initialize monitoring
memory_monitor = MemoryMonitor()
performance_metrics = PerformanceMetrics()
chunk_sizer = AdaptiveChunkSizer()
if args.verbose:
print(f"Processing {len(formulas)} materials...")
if enable_progress:
print("Progress tracking enabled")
# Create progress tracker
with create_batch_progress_tracker(
total_items=len(formulas),
desc="Processing materials",
verbose=args.verbose,
disable_progress=not enable_progress,
) as progress:
for i, (formula, density, energies) in enumerate(
zip(formulas, densities, energy_sets, strict=False)
):
try:
# Update memory monitoring
memory_monitor.update()
# Time the operation for performance metrics
with performance_metrics.time_operation():
if args.verbose and not enable_progress:
print(f" {i + 1}/{len(formulas)}: {formula}")
result = calculate_single_material_properties(
formula, energies, density
)
for j, _energy in enumerate(energies):
result_dict = _convert_result_to_dict(result, j)
results.append(result_dict)
# Record the operation
performance_metrics.record_operations(len(energies))
except Exception as e:
if not enable_progress: # Only print if progress bar isn't showing
print(f"Warning: Failed to process {formula}: {e}")
continue
finally:
# Update progress
progress.update(1)
# Show performance summary if verbose
if args.verbose:
print("\n" + "=" * 50)
performance_metrics.print_summary(verbose=True)
memory_monitor.print_summary()
# Show chunk sizing recommendation for future runs
recommended_chunk = chunk_sizer.calculate_chunk_size(len(formulas))
if len(formulas) > recommended_chunk:
print(
"💡 For optimal memory usage, consider processing in chunks of"
f" {recommended_chunk}"
)
return results
def _save_batch_results(results: list[dict[str, Any]], args: Any) -> None:
"""Save batch results to output file."""
if args.fields:
field_list = [field.strip() for field in args.fields.split(",")]
results = [
{k: v for k, v in result.items() if k in field_list} for result in results
]
output_format = args.format
output_path = Path(args.output)
if not output_format:
output_format = "json" if output_path.suffix.lower() == ".json" else "csv"
if output_format == "json":
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
else:
# Write CSV without pandas
import csv
if results:
with open(args.output, "w", newline="", encoding="utf-8") as f:
fieldnames = results[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
if args.verbose:
print(f"Results saved to {args.output}")
print(
f"Processed {len(results)} data points from "
f"{len({r['formula'] for r in results})} unique materials"
)
[docs]
def cmd_batch(args: Any) -> int:
"""Handle the 'batch' command."""
try:
# Lazy imports for batch processing
from xraylabtool.validation.enhanced_validator import EnhancedValidator # type: ignore[import-not-found]
from xraylabtool.validation.error_recovery import ErrorRecoveryManager # type: ignore[import-not-found]
df_input = _validate_batch_input(args)
if df_input is None:
return 1
# Initialize enhanced error handling for batch processing
debug_mode = getattr(args, "debug", False)
validator = EnhancedValidator(debug=debug_mode)
recovery_manager = ErrorRecoveryManager(
validator, interactive=False
) # Non-interactive for batch
# Validate all formulas in the batch
formulas = df_input["formula"].tolist() # type: ignore[call-overload]
validation_results = validator.validate_batch_formulas(
formulas, command_context="batch"
)
# Try to recover from validation errors
recovered_formulas = recovery_manager.recover_batch_errors(
validation_results, "batch processing", fail_fast=False
)
# Update the dataframe with recovered formulas
for i, (original_formula, recovered_formula) in enumerate(
zip(formulas, recovered_formulas, strict=False)
):
if recovered_formula and recovered_formula != original_formula:
df_input.loc[i, "formula"] = recovered_formula # type: ignore[attr-defined]
if args.verbose:
print(
f"✅ Auto-corrected formula {i + 1}: '{original_formula}' →"
f" '{recovered_formula}'"
)
elif not recovered_formula:
if args.verbose:
print(
f"⚠️ Could not process formula {i + 1}: '{original_formula}' -"
" skipping"
)
# Generate batch improvement suggestions
batch_suggestions = recovery_manager.suggest_batch_improvements(
validation_results
)
if batch_suggestions["status"] == "errors_found" and (
args.verbose or debug_mode
):
print("\n📊 Batch Processing Summary:")
print(f" Total items: {batch_suggestions['summary']['total_items']}")
print(f" Success rate: {batch_suggestions['summary']['success_rate']}")
if batch_suggestions["recommendations"]:
print(" Recommendations:")
for rec in batch_suggestions["recommendations"]:
print(f" • {rec}")
print()
parsed_data = _parse_batch_data(df_input)
if parsed_data[0] is None:
return 1
formulas, densities, energy_sets = parsed_data
assert (
formulas is not None and densities is not None and energy_sets is not None
)
results = _process_batch_materials(formulas, densities, energy_sets, args)
if not results:
print("Error: No materials were successfully processed", file=sys.stderr)
return 1
_save_batch_results(results, args)
# Show recovery statistics if in verbose or debug mode
if args.verbose or debug_mode:
recovery_stats = recovery_manager.get_recovery_stats()
if recovery_stats["total_errors"] > 0:
print("\n📈 Error Recovery Statistics:")
print(f" Total errors encountered: {recovery_stats['total_errors']}")
print(f" Auto-recovery rate: {recovery_stats['auto_recovery_rate']}")
print(
" Overall recovery rate:"
f" {recovery_stats['overall_recovery_rate']}"
)
return 0
except Exception as e:
debug_mode = getattr(args, "debug", False)
if debug_mode:
import traceback
print("🔍 Debug: Full traceback:", file=sys.stderr)
traceback.print_exc()
print(f"Error: {e}", file=sys.stderr)
return 1
[docs]
def cmd_convert(args: Any) -> int:
"""Handle the 'convert' command."""
try:
# Parse values
values = [float(x.strip()) for x in args.values.split(",")]
# Perform conversion
if args.from_unit == "energy" and args.to_unit == "wavelength":
converted = [energy_to_wavelength(v) for v in values]
unit_label = "Å"
elif args.from_unit == "wavelength" and args.to_unit == "energy":
converted = [wavelength_to_energy(v) for v in values]
unit_label = "keV"
else:
print(
f"Error: Cannot convert from {args.from_unit} to {args.to_unit}",
file=sys.stderr,
)
return 1
# Format output
if args.output:
# Save to CSV
import csv
with open(args.output, "w", newline="", encoding="utf-8") as f:
fieldnames = [f"{args.from_unit}", f"{args.to_unit} ({unit_label})"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for val, conv in zip(values, converted, strict=False):
writer.writerow({fieldnames[0]: val, fieldnames[1]: conv})
print(f"Conversion results saved to {args.output}")
else:
# Print to console
print(f"{args.from_unit.title()} to {args.to_unit.title()} Conversion:")
print("-" * 40)
for original, converted_val in zip(values, converted, strict=False):
print(f"{original: >10.4f} → {converted_val: >10.4f} {unit_label}")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
def _get_atomic_data(elements: list[str]) -> list[dict[str, Any]]:
"""Get atomic data for list of elements."""
atomic_data = []
for element in elements:
try:
atomic_data.append(
{
"element": element,
"atomic_number": get_atomic_number(element),
"atomic_weight": get_atomic_weight(element),
}
)
except Exception as e:
print(f"Warning: Could not get atomic data for {element}: {e}")
return atomic_data
def _process_formula(formula: str, verbose: bool) -> dict[str, Any]:
"""Process a single formula and return info."""
elements, counts = parse_formula(formula)
formula_info = {
"formula": formula,
"elements": elements,
"counts": counts,
"element_count": len(elements),
"total_atoms": sum(counts),
}
if verbose:
formula_info["atomic_data"] = _get_atomic_data(elements)
return formula_info
def _output_formula_results(results: list[dict[str, Any]], args: Any) -> None:
"""Output formula results to file or console."""
if args.output:
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
print(f"Formula analysis saved to {args.output}")
else:
_print_formula_results(results, args.verbose)
def _print_formula_results(results: list[dict[str, Any]], verbose: bool) -> None:
"""Print formula results to console."""
for result in results:
print(f"Formula: {result['formula']}")
print(f"Elements: {', '.join(result['elements'])}")
print(f"Counts: {', '.join(map(str, result['counts']))}")
print(f"Total atoms: {result['total_atoms']}")
if verbose and "atomic_data" in result:
print("Atomic data:")
for atom_data in result["atomic_data"]:
print(
f" {atom_data['element']: >2}: "
f"Z={atom_data['atomic_number']: >3}, "
f"MW={atom_data['atomic_weight']: >8.3f}"
)
print()
[docs]
def cmd_atomic(args: Any) -> int:
"""Handle the 'atomic' command."""
try:
elements = [e.strip() for e in args.elements.split(",")]
results = []
for element in elements:
try:
atomic_number = get_atomic_number(element)
atomic_weight = get_atomic_weight(element)
element_data = {
"element": element,
"atomic_number": atomic_number,
"atomic_weight": atomic_weight,
}
results.append(element_data)
except Exception as e:
print(f"Error getting atomic data for {element}: {e}", file=sys.stderr)
continue
if not results:
print("No valid elements found", file=sys.stderr)
return 1
# Output results
if args.output:
output_path = Path(args.output)
if output_path.suffix.lower() == ".json":
with open(args.output, "w") as f:
json.dump(results, f, indent=2)
else: # CSV
import csv
if results:
with open(args.output, "w", newline="", encoding="utf-8") as f:
fieldnames = results[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"Atomic data saved to {args.output}")
else:
print("Atomic Data:")
print("-" * 30)
print(f"{'Element': >8} {'Z': >3} {'MW (u)': >10}")
print("-" * 30)
for data in results:
print(
f"{data['element']: >8} {data['atomic_number']: >3} "
f"{data['atomic_weight']: >10.3f}"
)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
[docs]
def cmd_bragg(args: Any) -> int:
"""Handle the 'bragg' command."""
try:
# Parse d-spacings
d_spacings = [float(x.strip()) for x in args.dspacing.split(",")]
# Determine wavelength
if args.wavelength:
wavelength = float(args.wavelength)
else: # args.energy
energy = float(args.energy)
wavelength = energy_to_wavelength(energy)
# Calculate Bragg angles
results = []
for d_spacing in d_spacings:
try:
angle = bragg_angle(d_spacing, wavelength, args.order)
results.append(
{
"d_spacing_angstrom": d_spacing,
"wavelength_angstrom": wavelength,
"order": args.order,
"bragg_angle_degrees": angle,
"two_theta_degrees": 2 * angle,
}
)
except Exception as e:
print(
f"Warning: Could not calculate Bragg angle for d={d_spacing}: {e}"
)
continue
if not results:
print("No valid Bragg angles calculated", file=sys.stderr)
return 1
# Output results
if args.output:
import csv
if results:
with open(args.output, "w", newline="", encoding="utf-8") as f:
fieldnames = results[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"Bragg angle results saved to {args.output}")
else:
print("Bragg Angle Calculations:")
print("-" * 50)
print(f"{'d (Å)': >8} {'θ (°)': >8} {'2θ (°)': >8}")
print("-" * 50)
for result in results:
print(
f"{result['d_spacing_angstrom']: >8.3f} "
f"{result['bragg_angle_degrees']: >8.3f} "
f"{result['two_theta_degrees']: >8.3f}"
)
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
[docs]
def cmd_list(args: Any) -> int:
"""Handle the 'list' command."""
if args.type == "constants":
print("Physical Constants:")
print("=" * 40)
from xraylabtool import constants
const_names = [
"THOMPSON",
"SPEED_OF_LIGHT",
"PLANCK",
"ELEMENT_CHARGE",
"AVOGADRO",
"ENERGY_TO_WAVELENGTH_FACTOR",
"PI",
"TWO_PI",
]
for name in const_names:
if hasattr(constants, name):
value = getattr(constants, name)
print(f"{name: <25}: {value}")
elif args.type == "fields":
print("Available XRayResult Fields (new snake_case names):")
print("=" * 60)
field_descriptions = [
("formula", "Chemical formula string"),
("molecular_weight_g_mol", "Molecular weight (g/mol)"),
("total_electrons", "Total electrons per molecule"),
("density_g_cm3", "Mass density (g/cm³)"),
("electron_density_per_ang3", "Electron density (electrons/ų)"),
("energy_kev", "X-ray energies (keV)"),
("wavelength_angstrom", "X-ray wavelengths (Å)"),
("dispersion_delta", "Dispersion coefficient δ"),
("absorption_beta", "Absorption coefficient β"),
("scattering_factor_f1", "Real atomic scattering factor"),
("scattering_factor_f2", "Imaginary atomic scattering factor"),
("critical_angle_degrees", "Critical angles (degrees)"),
("attenuation_length_cm", "Attenuation lengths (cm)"),
("real_sld_per_ang2", "Real SLD (Å⁻²)"),
("imaginary_sld_per_ang2", "Imaginary SLD (Å⁻²)"),
]
for field, description in field_descriptions:
print(f"{field: <25}: {description}")
elif args.type == "examples":
print("CLI Usage Examples:")
print("=" * 40)
examples = [
("Single material calculation", "xraylabtool calc SiO2 -e 10.0 -d 2.2"),
("Multiple energies", "xraylabtool calc Si -e 5.0,10.0,15.0 -d 2.33"),
("Energy range", "xraylabtool calc Al2O3 -e 5-15:11 -d 3.95"),
("Save to CSV", "xraylabtool calc SiO2 -e 10.0 -d 2.2 -o results.csv"),
("Batch processing", "xraylabtool batch materials.csv -o results.csv"),
("Unit conversion", "xraylabtool convert energy 10.0 --to wavelength"),
("Formula parsing", "xraylabtool formula SiO2 --verbose"),
("Bragg angles", "xraylabtool bragg -d 3.14 -e 8.0"),
("Install completion", "xraylabtool install-completion"),
]
for description, command in examples:
print(f"\n{description}:")
print(f" {command}")
return 0
[docs]
def cmd_install_completion(args: Any) -> int:
"""Handle the 'install-completion' command."""
from xraylabtool.interfaces.completion import install_completion_main
return install_completion_main(args)
[docs]
def cmd_uninstall_completion(args: Any) -> int:
"""Handle the 'uninstall-completion' command."""
from xraylabtool.interfaces.completion import uninstall_completion_main
return uninstall_completion_main(args)
[docs]
def cmd_completion(args: Any) -> int:
"""Handle the 'completion' command for the new completion system."""
from xraylabtool.interfaces.completion_v2.installer import CompletionInstaller
try:
installer = CompletionInstaller()
# Check if no action was specified
if not hasattr(args, "completion_action") or args.completion_action is None:
print(
"❌ No action specified. Use 'xraylabtool completion --help' for usage information."
)
return 1
if args.completion_action == "install":
success = installer.install(
shell=getattr(args, "shell", None),
target_env=getattr(args, "env", None),
force=getattr(args, "force", False),
)
return 0 if success else 1
elif args.completion_action == "uninstall":
success = installer.uninstall(
target_env=getattr(args, "env", None),
all_envs=getattr(args, "all", False),
)
return 0 if success else 1
elif args.completion_action == "list":
installer.list_environments()
return 0
elif args.completion_action == "status":
installer.status()
return 0
elif args.completion_action == "info":
from xraylabtool.interfaces.completion_v2.cli import show_completion_info
show_completion_info()
return 0
else:
print(f"❌ Unknown action: {args.completion_action}")
return 1
except KeyboardInterrupt:
print("\n⚠️ Operation cancelled by user")
return 1
except Exception as e:
print(f"❌ Error: {e}")
return 1
[docs]
def cmd_compare(args: Any) -> int:
"""Handle the 'compare' command for material comparison."""
try:
# Lazy imports for comparison functionality
from xraylabtool.analysis import MaterialComparator
# Parse materials input
materials = []
formulas = []
densities = []
for material_str in args.materials:
try:
parts = material_str.split(",")
if len(parts) != 2:
raise ValueError(
f"Invalid material format: {material_str}. Expected"
" 'formula,density'"
)
formula = parts[0].strip()
density = float(parts[1].strip())
formulas.append(formula)
densities.append(density)
materials.append((formula, density))
except ValueError as e:
print(f"Error parsing material '{material_str}': {e}", file=sys.stderr)
return 1
if len(materials) < 2:
print(
"Error: At least two materials required for comparison", file=sys.stderr
)
return 1
# Parse energies
try:
energies = parse_energy_string(args.energy).tolist()
except Exception as e:
print(f"Error parsing energy range: {e}", file=sys.stderr)
return 1
# Parse properties
properties = None
if args.properties:
properties = [prop.strip() for prop in args.properties.split(",")]
# Perform comparison
comparator = MaterialComparator()
try:
result = comparator.compare_materials(
formulas=formulas,
densities=densities,
energies=energies,
properties=properties,
)
except Exception as e:
print(f"Error during comparison: {e}", file=sys.stderr)
return 1
# Generate output
if args.report or args.format == "report":
report = comparator.generate_comparison_report(result)
if args.output:
with open(args.output, "w") as f:
f.write(report)
print(f"Comparison report saved to {args.output}")
else:
print(report)
else:
# Create comparison table
table = comparator.create_comparison_table(result)
if args.output:
output_path = Path(args.output)
if output_path.suffix.lower() == ".json":
# Convert to JSON format
output_data = {
"materials": result.materials,
"energies": result.energies,
"properties": result.properties,
"data": result.data,
"summary_stats": result.summary_stats,
"recommendations": result.recommendations,
}
with open(args.output, "w") as f:
json.dump(output_data, f, indent=2)
else: # CSV
table.to_csv(args.output, index=False)
print(f"Comparison results saved to {args.output}")
# Print table to console
elif args.format == "json":
output_data = {
"materials": result.materials,
"energies": result.energies,
"properties": result.properties,
"data": result.data,
"summary_stats": result.summary_stats,
"recommendations": result.recommendations,
}
print(json.dumps(output_data, indent=2))
elif args.format == "csv":
print(table.to_csv(index=False))
else: # table
print(table.to_string(index=False))
return 0
except Exception as e:
print(f"Comparison failed: {e}", file=sys.stderr)
if hasattr(args, "debug") and args.debug:
import traceback
traceback.print_exc()
return 1
[docs]
def main() -> int:
"""Execute the main CLI application."""
configure_logging()
logger = get_logger("cli")
log_environment(logger, component="cli")
parser = create_parser()
import time
started = time.perf_counter()
try:
args = parser.parse_args()
except SystemExit as e:
# Handle argparse sys.exit calls gracefully in tests
if e.code == 0: # --help or --version
raise # Re-raise for normal help/version behavior
else:
# Invalid arguments - return error code instead of exiting
return 1
# Show debug mode status if enabled
if getattr(args, "debug", False):
print(
"🔍 Debug mode enabled - detailed error information will be shown",
file=sys.stderr,
)
# Handle --install-completion flag before checking for subcommands
if hasattr(args, "install_completion") and args.install_completion is not None:
from xraylabtool.interfaces.completion import install_completion_main
# Create a mock args object that matches the install-completion
# subcommand format
class MockArgs:
def __init__(
self,
shell_type: str | None,
test: bool = False,
system: bool = False,
uninstall: bool = False,
) -> None:
self.shell = shell_type if shell_type != "auto" else None
self.system = system
# user installation is default unless system is specified
self.user = not system
self.uninstall = uninstall
self.test = test
mock_args = MockArgs(
args.install_completion,
test=getattr(args, "test", False),
system=getattr(args, "system", False),
uninstall=getattr(args, "uninstall", False),
)
return install_completion_main(mock_args) # type: ignore[arg-type]
# If no command specified, show help
if not args.command:
parser.print_help()
return 1
# Route to appropriate command handler
command_handlers = {
"calc": cmd_calc,
"batch": cmd_batch,
"compare": cmd_compare,
"convert": cmd_convert,
"formula": cmd_formula,
"atomic": cmd_atomic,
"bragg": cmd_bragg,
"list": cmd_list,
"completion": cmd_completion,
"install-completion": cmd_install_completion,
"uninstall-completion": cmd_uninstall_completion,
}
handler = command_handlers.get(args.command)
if handler:
logger.info("Starting command", extra={"command": args.command})
try:
rc = handler(args)
except Exception as exc:
logger.exception("Command failed", extra={"command": args.command})
if getattr(args, "debug", False):
raise
print(f"Error running {args.command}: {exc}", file=sys.stderr)
return 1
duration_s = time.perf_counter() - started
logger.info(
"Command finished",
extra={
"command": args.command,
"status": rc,
"duration_s": round(duration_s, 4),
},
)
return rc
else:
print(f"Unknown command: {args.command}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())