Merge pull request #45933 from ionutbalutoiu/qa-add-rbd-wnbd-tests

qa: add basic Ceph on Windows integration test
This commit is contained in:
David Galloway 2022-04-19 16:50:24 -04:00 committed by GitHub
commit 0f4be50475
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1166 additions and 0 deletions

1
qa/suites/windows/.qa Symbolic link
View File

@ -0,0 +1 @@
../.qa/

View File

1
qa/suites/windows/basic/.qa Symbolic link
View File

@ -0,0 +1 @@
../.qa/

View File

@ -0,0 +1 @@
../.qa/

View File

@ -0,0 +1 @@
.qa/clusters/fixed-1.yaml

View File

@ -0,0 +1 @@
../.qa/

View File

@ -0,0 +1,3 @@
tasks:
- install:
- ceph:

View File

@ -0,0 +1 @@
../.qa/

View File

@ -0,0 +1,6 @@
tasks:
- workunit:
clients:
client.0:
- windows/libvirt_vm/setup.sh
- windows/run-rbd-wnbd-tests.sh

View File

@ -0,0 +1 @@
.qa/distros/supported/ubuntu_latest.yaml

View File

@ -0,0 +1,157 @@
<?xml version="1.0" encoding="utf-8"?>
<unattend xmlns="urn:schemas-microsoft-com:unattend">
<settings pass="windowsPE">
<component name="Microsoft-Windows-International-Core-WinPE" processorArchitecture="amd64" publicKeyToken="31bf3856ad364e35" language="neutral" versionScope="nonSxS" xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<SetupUILanguage>
<UILanguage>en-US</UILanguage>
</SetupUILanguage>
<SystemLocale>en-US</SystemLocale>
<UILanguage>en-US</UILanguage>
<UserLocale>en-US</UserLocale>
</component>
<component name="Microsoft-Windows-Setup" processorArchitecture="amd64" publicKeyToken="31bf3856ad364e35" language="neutral" versionScope="nonSxS" xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<DiskConfiguration>
<WillShowUI>OnError</WillShowUI>
<Disk wcm:action="add">
<CreatePartitions>
<CreatePartition wcm:action="add">
<Order>1</Order>
<Size>100</Size>
<Type>Primary</Type>
</CreatePartition>
<CreatePartition wcm:action="add">
<Order>2</Order>
<Extend>true</Extend>
<Type>Primary</Type>
</CreatePartition>
</CreatePartitions>
<ModifyPartitions>
<ModifyPartition wcm:action="add">
<Active>true</Active>
<Label>Boot</Label>
<Format>NTFS</Format>
<Order>1</Order>
<PartitionID>1</PartitionID>
</ModifyPartition>
<ModifyPartition wcm:action="add">
<Format>NTFS</Format>
<Order>2</Order>
<PartitionID>2</PartitionID>
<Label>System</Label>
</ModifyPartition>
</ModifyPartitions>
<DiskID>0</DiskID>
<WillWipeDisk>true</WillWipeDisk>
</Disk>
</DiskConfiguration>
<ImageInstall>
<OSImage>
<InstallTo>
<PartitionID>2</PartitionID>
<DiskID>0</DiskID>
</InstallTo>
<InstallToAvailablePartition>false</InstallToAvailablePartition>
<WillShowUI>OnError</WillShowUI>
<InstallFrom>
<MetaData wcm:action="add">
<Key>/IMAGE/NAME</Key>
<Value>Windows Server 2019 SERVERSTANDARDCORE</Value>
</MetaData>
</InstallFrom>
</OSImage>
</ImageInstall>
<UserData>
<!-- Product Key from http://technet.microsoft.com/en-us/library/jj612867.aspx -->
<ProductKey>
<!-- Do not uncomment the Key element if you are using trial ISOs -->
<!-- You must uncomment the Key element (and optionally insert your own key) if you are using retail or volume license ISOs -->
<!-- <Key></Key> -->
<WillShowUI>OnError</WillShowUI>
</ProductKey>
<AcceptEula>true</AcceptEula>
</UserData>
</component>
<component name="Microsoft-Windows-PnpCustomizationsWinPE" processorArchitecture="amd64" publicKeyToken="31bf3856ad364e35" language="neutral" versionScope="nonSxS" xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<DriverPaths>
<PathAndCredentials wcm:action="add" wcm:keyValue="1">
<Path>E:\NetKVM\2k19\amd64\</Path>
</PathAndCredentials>
<PathAndCredentials wcm:action="add" wcm:keyValue="2">
<Path>E:\viostor\2k19\amd64\</Path>
</PathAndCredentials>
<PathAndCredentials wcm:action="add" wcm:keyValue="3">
<Path>E:\vioserial\2k19\amd64\</Path>
</PathAndCredentials>
</DriverPaths>
</component>
</settings>
<settings pass="oobeSystem">
<component name="Microsoft-Windows-Shell-Setup" processorArchitecture="amd64" publicKeyToken="31bf3856ad364e35" language="neutral" versionScope="nonSxS" xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<VisualEffects>
<FontSmoothing>ClearType</FontSmoothing>
</VisualEffects>
<UserAccounts>
<!--
Password to be used only during initial provisioning.
Must be reset with final Sysprep.
-->
<AdministratorPassword>
<Value>Passw0rd</Value>
<PlainText>true</PlainText>
</AdministratorPassword>
</UserAccounts>
<AutoLogon>
<Password>
<Value>Passw0rd</Value>
<PlainText>true</PlainText>
</Password>
<Enabled>true</Enabled>
<Username>Administrator</Username>
</AutoLogon>
<ComputerName>*</ComputerName>
<OOBE>
<NetworkLocation>Work</NetworkLocation>
<HideEULAPage>true</HideEULAPage>
<ProtectYourPC>3</ProtectYourPC>
<SkipMachineOOBE>true</SkipMachineOOBE>
<SkipUserOOBE>true</SkipUserOOBE>
</OOBE>
<FirstLogonCommands>
<SynchronousCommand wcm:action="add">
<CommandLine>%SystemRoot%\System32\WindowsPowerShell\v1.0\powershell -NoLogo -NonInteractive -ExecutionPolicy RemoteSigned -File A:\first-logon.ps1</CommandLine>
<Order>1</Order>
</SynchronousCommand>
</FirstLogonCommands>
</component>
</settings>
<settings pass="specialize">
<component name="Microsoft-Windows-Shell-Setup" processorArchitecture="amd64" publicKeyToken="31bf3856ad364e35" language="neutral" versionScope="nonSxS" xmlns:wcm="http://schemas.microsoft.com/WMIConfig/2002/State" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<TimeZone>UTC</TimeZone>
<ComputerName>*</ComputerName>
</component>
</settings>
</unattend>

View File

@ -0,0 +1,42 @@
$ErrorActionPreference = "Stop"
. "${PSScriptRoot}\utils.ps1"
$VIRTIO_WIN_PATH = "E:\"
# Install QEMU quest agent
Write-Output "Installing QEMU guest agent"
$p = Start-Process -FilePath "msiexec.exe" -ArgumentList @("/i", "${VIRTIO_WIN_PATH}\guest-agent\qemu-ga-x86_64.msi", "/qn") -NoNewWindow -PassThru -Wait
if($p.ExitCode) {
Throw "The QEMU guest agent installation failed. Exit code: $($p.ExitCode)"
}
Write-Output "Successfully installed QEMU guest agent"
# Install OpenSSH server
Start-ExecuteWithRetry {
Get-WindowsCapability -Online -Name OpenSSH* | Add-WindowsCapability -Online
}
# Start OpenSSH server
Set-Service -Name "sshd" -StartupType Automatic
Start-Service -Name "sshd"
# Set PowerShell as default SSH shell
New-ItemProperty -PropertyType String -Force -Name DefaultShell -Path "HKLM:\SOFTWARE\OpenSSH" -Value (Get-Command powershell.exe).Source
# Create SSH firewall rule
New-NetFirewallRule -Name "sshd" -DisplayName 'OpenSSH Server (sshd)' -Enabled True -Direction Inbound -Protocol TCP -Action Allow -LocalPort 22
# Authorize the SSH key
$authorizedKeysFile = Join-Path $env:ProgramData "ssh\administrators_authorized_keys"
Set-Content -Path $authorizedKeysFile -Value (Get-Content "${PSScriptRoot}\id_rsa.pub") -Encoding ascii
$acl = Get-Acl $authorizedKeysFile
$acl.SetAccessRuleProtection($true, $false)
$administratorsRule = New-Object system.security.accesscontrol.filesystemaccessrule("Administrators", "FullControl", "Allow")
$systemRule = New-Object system.security.accesscontrol.filesystemaccessrule("SYSTEM", "FullControl", "Allow")
$acl.SetAccessRule($administratorsRule)
$acl.SetAccessRule($systemRule)
$acl | Set-Acl
# Reboot the machine to complete first logon process
Restart-Computer -Force -Confirm:$false

View File

@ -0,0 +1,43 @@
$ErrorActionPreference = "Stop"
$ProgressPreference = "SilentlyContinue"
$PYTHON3_URL = "https://www.python.org/ftp/python/3.10.4/python-3.10.4-amd64.exe"
$FIO_URL = "https://bsdio.com/fio/releases/fio-3.27-x64.msi"
$VC_REDIST_URL = "https://aka.ms/vs/17/release/vc_redist.x64.exe"
. "${PSScriptRoot}\utils.ps1"
function Install-VCRedist {
Write-Output "Installing Visual Studio Redistributable x64"
Install-Tool -URL $VC_REDIST_URL -Params @("/quiet", "/norestart")
Write-Output "Successfully installed Visual Studio Redistributable x64"
}
function Install-Python3 {
Write-Output "Installing Python3"
Install-Tool -URL $PYTHON3_URL -Params @("/quiet", "InstallAllUsers=1", "PrependPath=1")
Add-ToPathEnvVar -Path @("${env:ProgramFiles}\Python310\", "${env:ProgramFiles}\Python310\Scripts\")
Write-Output "Installing pip dependencies"
Start-ExecuteWithRetry {
Invoke-CommandLine "pip3.exe" "install prettytable"
}
Write-Output "Successfully installed Python3"
}
function Install-FIO {
Write-Output "Installing FIO"
Install-Tool -URL $FIO_URL -Params @("/qn", "/l*v", "$env:TEMP\fio-install.log", "/norestart")
Write-Output "Successfully installed FIO"
}
Install-VCRedist
Install-Python3
Install-FIO
# Pre-append WNBD and Ceph to PATH
Add-ToPathEnvVar -Path @(
"${env:SystemDrive}\wnbd\binaries",
"${env:SystemDrive}\ceph")
# This will refresh the PATH for new SSH sessions
Restart-Service -Force -Name "sshd"

View File

@ -0,0 +1,162 @@
#!/usr/bin/env bash
set -ex
WINDOWS_SERVER_2019_ISO_URL=${WINDOWS_SERVER_2019_ISO_URL:-"https://software-download.microsoft.com/download/pr/17763.737.190906-2324.rs5_release_svc_refresh_SERVER_EVAL_x64FRE_en-us_1.iso"}
VIRTIO_WIN_ISO_URL=${VIRTIO_WIN_ISO_URL:-"https://fedorapeople.org/groups/virt/virtio-win/direct-downloads/stable-virtio/virtio-win.iso"}
DIR="$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)"
# Use build_utils.sh from ceph-build
curl --retry-max-time 30 --retry 10 -L -o ${DIR}/build_utils.sh https://raw.githubusercontent.com/ceph/ceph-build/master/scripts/build_utils.sh
source ${DIR}/build_utils.sh
# Helper function to restart the Windows VM
function restart_windows_vm() {
echo "Restarting Windows VM"
ssh_exec "cmd.exe /c 'shutdown.exe /r /t 0 & sc.exe stop sshd'"
SECONDS=0
TIMEOUT=${1:-600}
while true; do
if [[ $SECONDS -gt $TIMEOUT ]]; then
echo "Timeout waiting for the VM to start"
exit 1
fi
ssh_exec hostname || {
echo "Cannot execute SSH commands yet"
sleep 10
continue
}
break
done
echo "Windows VM restarted"
}
# Install libvirt with KVM
retrycmd_if_failure 5 0 5m sudo apt-get update
retrycmd_if_failure 5 0 10m sudo apt-get install -y qemu-kvm libvirt-daemon-system libvirt-clients virtinst
# Download ISO images
echo "Downloading virtio-win ISO"
retrycmd_if_failure 5 0 30m curl -C - -L $VIRTIO_WIN_ISO_URL -o ${DIR}/virtio-win.iso
echo "Downloading Windows Server 2019 ISO"
retrycmd_if_failure 5 0 60m curl -C - -L $WINDOWS_SERVER_2019_ISO_URL -o ${DIR}/windows-server-2019.iso
# Create virtual floppy image with the unattended instructions to install Windows Server 2019
echo "Creating floppy image"
qemu-img create -f raw ${DIR}/floppy.img 1440k
mkfs.msdos -s 1 ${DIR}/floppy.img
mkdir ${DIR}/floppy
sudo mount ${DIR}/floppy.img ${DIR}/floppy
ssh-keygen -b 2048 -t rsa -f ${DIR}/id_rsa -q -N ""
sudo cp \
${DIR}/autounattend.xml \
${DIR}/first-logon.ps1 \
${DIR}/id_rsa.pub \
${DIR}/utils.ps1 \
${DIR}/setup.ps1 \
${DIR}/floppy/
sudo umount ${DIR}/floppy
rmdir ${DIR}/floppy
echo "Starting libvirt VM"
qemu-img create -f qcow2 ${DIR}/ceph-win-ltsc2019.qcow2 50G
VM_NAME="ceph-win-ltsc2019"
sudo virt-install \
--name $VM_NAME \
--os-variant win2k19 \
--boot hd,cdrom \
--virt-type kvm \
--graphics spice \
--cpu host \
--vcpus 4 \
--memory 4096 \
--disk ${DIR}/floppy.img,device=floppy \
--disk ${DIR}/ceph-win-ltsc2019.qcow2,bus=virtio \
--disk ${DIR}/windows-server-2019.iso,device=cdrom \
--disk ${DIR}/virtio-win.iso,device=cdrom \
--network network=default,model=virtio \
--controller type=virtio-serial \
--channel unix,target_type=virtio,name=org.qemu.guest_agent.0 \
--noautoconsol
export SSH_USER="administrator"
export SSH_KNOWN_HOSTS_FILE="${DIR}/known_hosts"
export SSH_KEY="${DIR}/id_rsa"
SECONDS=0
TIMEOUT=1800
SLEEP_SECS=30
while true; do
if [[ $SECONDS -gt $TIMEOUT ]]; then
echo "Timeout waiting for the VM to start"
exit 1
fi
VM_IP=$(sudo virsh domifaddr --source agent --interface Ethernet --full $VM_NAME | grep ipv4 | awk '{print $4}' | cut -d '/' -f1) || {
echo "Retrying in $SLEEP_SECS seconds"
sleep $SLEEP_SECS
continue
}
ssh-keyscan -H $VM_IP &> $SSH_KNOWN_HOSTS_FILE || {
echo "SSH is not reachable yet"
sleep $SLEEP_SECS
continue
}
SSH_ADDRESS=$VM_IP ssh_exec hostname || {
echo "Cannot execute SSH commands yet"
sleep $SLEEP_SECS
continue
}
break
done
export SSH_ADDRESS=$VM_IP
scp_upload ${DIR}/utils.ps1 /utils.ps1
scp_upload ${DIR}/setup.ps1 /setup.ps1
SSH_TIMEOUT=1h ssh_exec /setup.ps1
cd $DIR
# Get the helper script to download Chacra builds
retrycmd_if_failure 10 5 1m curl -L -o ./get-chacra-bin.py https://raw.githubusercontent.com/ceph/ceph-win32-tests/master/get-bin.py
chmod +x ./get-chacra-bin.py
# Download latest WNBD build from Chacra
retrycmd_if_failure 10 0 10m ./get-chacra-bin.py --project wnbd --filename wnbd.zip
scp_upload wnbd.zip /wnbd.zip
ssh_exec tar.exe xzvf /wnbd.zip -C /
# Install WNBD driver
ssh_exec Import-Certificate -FilePath /wnbd/driver/wnbd.cer -Cert Cert:\\LocalMachine\\Root
ssh_exec Import-Certificate -FilePath /wnbd/driver/wnbd.cer -Cert Cert:\\LocalMachine\\TrustedPublisher
ssh_exec /wnbd/binaries/wnbd-client.exe install-driver /wnbd/driver/wnbd.inf
restart_windows_vm
ssh_exec wnbd-client.exe -v
# Download Ceph Windows build from Chacra
CEPH_REPO_FILE="/etc/apt/sources.list.d/ceph.list"
PROJECT=$(cat $CEPH_REPO_FILE | cut -d ' ' -f3 | tr '\/', ' ' | awk '{print $4}')
BRANCH=$(cat $CEPH_REPO_FILE | cut -d ' ' -f3 | tr '\/', ' ' | awk '{print $5}')
SHA1=$(cat $CEPH_REPO_FILE | cut -d ' ' -f3 | tr '\/', ' ' | awk '{print $6}')
retrycmd_if_failure 10 0 10m ./get-chacra-bin.py --project $PROJECT --branchname $BRANCH --sha1 $SHA1 --filename ceph.zip
# Install Ceph on Windows
SSH_TIMEOUT=5m scp_upload ./ceph.zip /ceph.zip
SSH_TIMEOUT=10m ssh_exec tar.exe xzvf /ceph.zip -C /
ssh_exec "New-Service -Name ceph-rbd -BinaryPathName 'c:\ceph\rbd-wnbd.exe service'"
ssh_exec Start-Service -Name ceph-rbd
ssh_exec rbd.exe -v
# Setup Ceph configs and directories
ssh_exec mkdir -force /etc/ceph, /var/run/ceph, /var/log/ceph
for i in $(ls /etc/ceph); do
scp_upload /etc/ceph/$i /etc/ceph/$i
done
cat << EOF > ${DIR}/connection_info.sh
export SSH_USER="${SSH_USER}"
export SSH_KNOWN_HOSTS_FILE="${SSH_KNOWN_HOSTS_FILE}"
export SSH_KEY="${SSH_KEY}"
export SSH_ADDRESS="${SSH_ADDRESS}"
EOF
echo "Windows Server 2019 libvirt testing VM is ready"

View File

@ -0,0 +1,130 @@
function Invoke-CommandLine {
Param(
[Parameter(Mandatory=$true)]
[String]$Command,
[String]$Arguments,
[Int[]]$AllowedExitCodes=@(0)
)
& $Command $Arguments.Split(" ")
if($LASTEXITCODE -notin $AllowedExitCodes) {
Throw "$Command $Arguments returned a non zero exit code ${LASTEXITCODE}."
}
}
function Start-ExecuteWithRetry {
Param(
[Parameter(Mandatory=$true)]
[ScriptBlock]$ScriptBlock,
[Int]$MaxRetryCount=10,
[Int]$RetryInterval=3,
[String]$RetryMessage,
[Array]$ArgumentList=@()
)
$currentErrorActionPreference = $ErrorActionPreference
$ErrorActionPreference = "Continue"
$retryCount = 0
while ($true) {
try {
$res = Invoke-Command -ScriptBlock $ScriptBlock -ArgumentList $ArgumentList
$ErrorActionPreference = $currentErrorActionPreference
return $res
} catch [System.Exception] {
$retryCount++
if ($retryCount -gt $MaxRetryCount) {
$ErrorActionPreference = $currentErrorActionPreference
Throw $_
} else {
$prefixMsg = "Retry(${retryCount}/${MaxRetryCount})"
if($RetryMessage) {
Write-Host "${prefixMsg} - $RetryMessage"
} elseif($_) {
Write-Host "${prefixMsg} - $($_.ToString())"
}
Start-Sleep $RetryInterval
}
}
}
}
function Start-FileDownload {
Param(
[Parameter(Mandatory=$true)]
[String]$URL,
[Parameter(Mandatory=$true)]
[String]$Destination,
[Int]$RetryCount=10
)
Write-Output "Downloading $URL to $Destination"
Start-ExecuteWithRetry `
-ScriptBlock { Invoke-CommandLine -Command "curl.exe" -Arguments "-L -s -o $Destination $URL" } `
-MaxRetryCount $RetryCount `
-RetryMessage "Failed to download '${URL}'. Retrying"
Write-Output "Successfully downloaded."
}
function Add-ToPathEnvVar {
Param(
[Parameter(Mandatory=$true)]
[String[]]$Path,
[Parameter(Mandatory=$false)]
[ValidateSet([System.EnvironmentVariableTarget]::User, [System.EnvironmentVariableTarget]::Machine)]
[System.EnvironmentVariableTarget]$Target=[System.EnvironmentVariableTarget]::Machine
)
$pathEnvVar = [Environment]::GetEnvironmentVariable("PATH", $Target).Split(';')
$currentSessionPath = $env:PATH.Split(';')
foreach($p in $Path) {
if($p -notin $pathEnvVar) {
$pathEnvVar += $p
}
if($p -notin $currentSessionPath) {
$currentSessionPath += $p
}
}
$env:PATH = $currentSessionPath -join ';'
$newPathEnvVar = $pathEnvVar -join ';'
[Environment]::SetEnvironmentVariable("PATH", $newPathEnvVar, $Target)
}
function Install-Tool {
[CmdletBinding(DefaultParameterSetName = "URL")]
Param(
[Parameter(Mandatory=$true, ParameterSetName = "URL")]
[String]$URL,
[Parameter(Mandatory=$true, ParameterSetName = "LocalPath")]
[String]$LocalPath,
[Parameter(ParameterSetName = "URL")]
[Parameter(ParameterSetName = "LocalPath")]
[String[]]$Params=@(),
[Parameter(ParameterSetName = "URL")]
[Parameter(ParameterSetName = "LocalPath")]
[Int[]]$AllowedExitCodes=@(0)
)
PROCESS {
$installerPath = $LocalPath
if($PSCmdlet.ParameterSetName -eq "URL") {
$installerPath = Join-Path $env:TEMP $URL.Split('/')[-1]
Start-FileDownload -URL $URL -Destination $installerPath
}
Write-Output "Installing ${installerPath}"
$kwargs = @{
"FilePath" = $installerPath
"ArgumentList" = $Params
"NoNewWindow" = $true
"PassThru" = $true
"Wait" = $true
}
if((Get-ChildItem $installerPath).Extension -eq '.msi') {
$kwargs["FilePath"] = "msiexec.exe"
$kwargs["ArgumentList"] = @("/i", $installerPath) + $Params
}
$p = Start-Process @kwargs
if($p.ExitCode -notin $AllowedExitCodes) {
Throw "Installation failed. Exit code: $($p.ExitCode)"
}
if($PSCmdlet.ParameterSetName -eq "URL") {
Start-ExecuteWithRetry `
-ScriptBlock { Remove-Item -Force -Path $installerPath -ErrorAction Stop } `
-RetryMessage "Failed to remove ${installerPath}. Retrying"
}
}
}

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
set -ex
DIR="$(cd $(dirname "${BASH_SOURCE[0]}") && pwd)"
source ${DIR}/libvirt_vm/build_utils.sh
source ${DIR}/libvirt_vm/connection_info.sh
# Run the rbd-wnbd tests
scp_upload ${DIR}/test_rbd_wnbd.py /test_rbd_wnbd.py
ssh_exec python.exe /test_rbd_wnbd.py --test-name RbdTest
ssh_exec python.exe /test_rbd_wnbd.py --test-name RbdFioTest
ssh_exec python.exe /test_rbd_wnbd.py --test-name RbdStampTest

View File

@ -0,0 +1,603 @@
import argparse
import collections
import json
import logging
import math
import prettytable
import random
import subprocess
import time
import threading
import typing
import uuid
from concurrent import futures
LOG = logging.getLogger()
parser = argparse.ArgumentParser(description='rbd-wnbd tests')
parser.add_argument('--test-name',
help='The test to be run.',
default="RbdFioTest")
parser.add_argument('--iterations',
help='Total number of test iterations',
default=1, type=int)
parser.add_argument('--concurrency',
help='The number of tests to run in parallel',
default=4, type=int)
parser.add_argument('--fio-iterations',
help='Total number of benchmark iterations per disk.',
default=1, type=int)
parser.add_argument('--fio-workers',
help='Total number of fio workers per disk.',
default=1, type=int)
parser.add_argument('--fio-depth',
help='The number of concurrent asynchronous operations '
'executed per disk',
default=64, type=int)
parser.add_argument('--bs',
help='Benchmark block size.',
default="2M")
parser.add_argument('--op',
help='Benchmark operation.',
default="read")
parser.add_argument('--image-prefix',
help='The image name prefix.',
default="cephTest-")
parser.add_argument('--image-size-mb',
help='The image size in megabytes.',
default=1024, type=int)
parser.add_argument('--map-timeout',
help='Image map timeout.',
default=60, type=int)
parser.add_argument('--verbose', action='store_true',
help='Print info messages.')
parser.add_argument('--debug', action='store_true',
help='Print debug messages.')
parser.add_argument('--stop-on-error', action='store_true',
help='Stop testing when hitting errors.')
parser.add_argument('--skip-cleanup-on-error', action='store_true',
help='Skip cleanup when hitting errors.')
class CephTestException(Exception):
msg_fmt = "An exception has been encountered."
def __init__(self, message: str = None, **kwargs):
self.kwargs = kwargs
if not message:
message = self.msg_fmt % kwargs
self.message = message
super(CephTestException, self).__init__(message)
class CommandFailed(CephTestException):
msg_fmt = (
"Command failed: %(command)s. "
"Return code: %(returncode)s. "
"Stdout: %(stdout)s. Stderr: %(stderr)s.")
def setup_logging(log_level: int = logging.INFO):
handler = logging.StreamHandler()
handler.setLevel(log_level)
log_fmt = '[%(asctime)s] %(levelname)s - %(message)s'
formatter = logging.Formatter(log_fmt)
handler.setFormatter(formatter)
LOG.addHandler(handler)
LOG.setLevel(logging.DEBUG)
def execute(*args, **kwargs):
LOG.debug("Executing: %s", args)
result = subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs)
LOG.debug("Command %s returned %d.", args, result.returncode)
if result.returncode:
exc = CommandFailed(
command=args, returncode=result.returncode,
stdout=result.stdout, stderr=result.stderr)
LOG.error(exc)
raise exc
return result
def array_stats(array: list):
mean = sum(array) / len(array) if len(array) else 0
variance = (sum((i - mean) ** 2 for i in array) / len(array)
if len(array) else 0)
std_dev = math.sqrt(variance)
sorted_array = sorted(array)
return {
'min': min(array) if len(array) else 0,
'max': max(array) if len(array) else 0,
'sum': sum(array) if len(array) else 0,
'mean': mean,
'median': sorted_array[len(array) // 2] if len(array) else 0,
'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0,
'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0,
'variance': variance,
'std_dev': std_dev,
'count': len(array)
}
class Tracer:
data: collections.OrderedDict = collections.OrderedDict()
lock = threading.Lock()
@classmethod
def trace(cls, func):
def wrapper(*args, **kwargs):
tstart = time.time()
exc_str = None
# Preserve call order
with cls.lock:
if func.__qualname__ not in cls.data:
cls.data[func.__qualname__] = list()
try:
return func(*args, **kwargs)
except Exception as exc:
exc_str = str(exc)
raise
finally:
tend = time.time()
with cls.lock:
cls.data[func.__qualname__] += [{
"duration": tend - tstart,
"error": exc_str,
}]
return wrapper
@classmethod
def get_results(cls):
stats = collections.OrderedDict()
for f in cls.data.keys():
stats[f] = array_stats([i['duration'] for i in cls.data[f]])
errors = []
for i in cls.data[f]:
if i['error']:
errors.append(i['error'])
stats[f]['errors'] = errors
return stats
@classmethod
def print_results(cls):
r = cls.get_results()
table = prettytable.PrettyTable(title="Duration (s)")
table.field_names = [
"function", "min", "max", "total",
"mean", "median", "std_dev",
"max 90%", "min 90%", "count", "errors"]
table.float_format = ".4"
for f, s in r.items():
table.add_row([f, s['min'], s['max'], s['sum'],
s['mean'], s['median'], s['std_dev'],
s['max_90'], s['min_90'],
s['count'], len(s['errors'])])
print(table)
class RbdImage(object):
def __init__(self,
name: str,
size_mb: int,
is_shared: bool = True,
disk_number: int = -1,
mapped: bool = False):
self.name = name
self.size_mb = size_mb
self.is_shared = is_shared
self.disk_number = disk_number
self.mapped = mapped
self.removed = False
@classmethod
@Tracer.trace
def create(cls,
name: str,
size_mb: int = 1024,
is_shared: bool = True):
LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb)
cmd = ["rbd", "create", name, "--size", "%sM" % size_mb]
if is_shared:
cmd += ["--image-shared"]
execute(*cmd)
return RbdImage(name, size_mb, is_shared)
@Tracer.trace
def get_disk_number(self,
timeout: int = 60,
retry_interval: int = 2):
tstart: float = time.time()
elapsed: float = 0
LOG.info("Retrieving disk number: %s", self.name)
while elapsed < timeout or not timeout:
result = execute("rbd-wnbd", "show", self.name, "--format=json")
disk_info = json.loads(result.stdout)
disk_number = disk_info["disk_number"]
if disk_number > 0:
LOG.debug("Image %s disk number: %d", self.name, disk_number)
return disk_number
elapsed = time.time() - tstart
if elapsed > 10:
level = logging.WARNING
else:
level = logging.DEBUG
LOG.log(
level,
"Could not get disk number: %s. Time elapsed: %d. Timeout: %d",
self.name, elapsed, timeout)
time.sleep(retry_interval)
elapsed = time.time() - tstart
raise CephTestException(
f"Could not get disk number for {self.name}. "
f"Time elapsed: {elapsed}. Timeout: {timeout}")
@Tracer.trace
def _wait_for_disk(self,
timeout: int = 60,
retry_interval: int = 2):
tstart: float = time.time()
elapsed: float = 0
LOG.debug("Waiting for disk to be accessible: %s %s",
self.name, self.path)
while elapsed < timeout or not timeout:
try:
with open(self.path, 'rb') as _:
return
except FileNotFoundError:
pass
elapsed = time.time() - tstart
if elapsed > 10:
level = logging.WARNING
else:
level = logging.DEBUG
LOG.log(level,
"The mapped disk isn't accessible yet: %s %s. "
"Time elapsed: %d. Timeout: %d",
self.name, self.path, elapsed, timeout)
time.sleep(retry_interval)
elapsed = time.time() - tstart
raise CephTestException(
f"The mapped disk isn't accessible yet: {self.name} {self.path}. "
f"Time elapsed: {elapsed}. Timeout: {timeout}")
@property
def path(self):
return f"\\\\.\\PhysicalDrive{self.disk_number}"
@Tracer.trace
def map(self, timeout: int = 60):
LOG.info("Mapping image: %s", self.name)
tstart = time.time()
execute("rbd-wnbd", "map", self.name)
self.mapped = True
self.disk_number = self.get_disk_number(timeout=timeout)
elapsed = time.time() - tstart
self._wait_for_disk(timeout=timeout - elapsed, )
@Tracer.trace
def unmap(self):
if self.mapped:
LOG.info("Unmapping image: %s", self.name)
execute("rbd-wnbd", "unmap", self.name)
self.mapped = False
@Tracer.trace
def remove(self):
if not self.removed:
LOG.info("Removing image: %s", self.name)
execute("rbd", "rm", self.name)
self.removed = True
def cleanup(self):
try:
self.unmap()
finally:
self.remove()
class RbdTest(object):
image: RbdImage
def __init__(self,
image_prefix: str = "cephTest-",
image_size_mb: int = 1024,
map_timeout: int = 60,
**kwargs):
self.image_size_mb = image_size_mb
self.image_name = image_prefix + str(uuid.uuid4())
self.map_timeout = map_timeout
@Tracer.trace
def initialize(self):
self.image = RbdImage.create(
self.image_name,
self.image_size_mb)
self.image.map(timeout=self.map_timeout)
def run(self):
pass
def cleanup(self):
if self.image:
self.image.cleanup()
@classmethod
def print_results(cls,
title: str = "Test results",
description: str = None):
pass
class RbdFioTest(RbdTest):
data: typing.List[typing.Dict[str, str]] = []
lock = threading.Lock()
def __init__(self,
*args,
fio_size_mb: int = None,
iterations: int = 1,
workers: int = 1,
bs: str = "2M",
iodepth: int = 64,
op: str = "read",
**kwargs):
super(RbdFioTest, self).__init__(*args, **kwargs)
self.fio_size_mb = fio_size_mb or self.image_size_mb
self.iterations = iterations
self.workers = workers
self.bs = bs
self.iodepth = iodepth
self.op = op
def process_result(self, raw_fio_output: str):
result = json.loads(raw_fio_output)
with self.lock:
for job in result["jobs"]:
self.data.append({
'error': job['error'],
'io_bytes': job[self.op]['io_bytes'],
'bw_bytes': job[self.op]['bw_bytes'],
'runtime': job[self.op]['runtime'] / 1000, # seconds
'total_ios': job[self.op]['short_ios'],
'short_ios': job[self.op]['short_ios'],
'dropped_ios': job[self.op]['short_ios'],
})
@Tracer.trace
def run(self):
LOG.info("Starting FIO test.")
cmd = [
"fio", "--thread", "--output-format=json",
"--randrepeat=%d" % self.iterations,
"--direct=1", "--gtod_reduce=1", "--name=test",
"--bs=%s" % self.bs, "--iodepth=%s" % self.iodepth,
"--size=%sM" % self.fio_size_mb,
"--readwrite=%s" % self.op,
"--numjobs=%s" % self.workers,
"--filename=%s" % self.image.path,
]
result = execute(*cmd)
LOG.info("Completed FIO test.")
self.process_result(result.stdout)
@classmethod
def print_results(cls,
title: str = "Benchmark results",
description: str = None):
if description:
title = "%s (%s)" % (title, description)
table = prettytable.PrettyTable(title=title)
table.field_names = ["stat", "min", "max", "mean",
"median", "std_dev",
"max 90%", "min 90%", "total"]
table.float_format = ".4"
s = array_stats([float(i["bw_bytes"]) / 1000_000 for i in cls.data])
table.add_row(["bandwidth (MB/s)",
s['min'], s['max'], s['mean'],
s['median'], s['std_dev'],
s['max_90'], s['min_90'], 'N/A'])
s = array_stats([float(i["runtime"]) / 1000 for i in cls.data])
table.add_row(["duration (s)",
s['min'], s['max'], s['mean'],
s['median'], s['std_dev'],
s['max_90'], s['min_90'], s['sum']])
s = array_stats([i["error"] for i in cls.data])
table.add_row(["errors",
s['min'], s['max'], s['mean'],
s['median'], s['std_dev'],
s['max_90'], s['min_90'], s['sum']])
s = array_stats([i["short_ios"] for i in cls.data])
table.add_row(["incomplete IOs",
s['min'], s['max'], s['mean'],
s['median'], s['std_dev'],
s['max_90'], s['min_90'], s['sum']])
s = array_stats([i["dropped_ios"] for i in cls.data])
table.add_row(["dropped IOs",
s['min'], s['max'], s['mean'],
s['median'], s['std_dev'],
s['max_90'], s['min_90'], s['sum']])
print(table)
class RbdStampTest(RbdTest):
@staticmethod
def _rand_float(min_val: float, max_val: float):
return min_val + (random.random() * max_val - min_val)
def _get_stamp(self):
buff = self.image_name.encode()
padding = 512 - len(buff)
buff += b'\0' * padding
return buff
@Tracer.trace
def _write_stamp(self):
with open(self.image.path, 'rb+') as disk:
stamp = self._get_stamp()
disk.write(stamp)
@Tracer.trace
def _read_stamp(self):
with open(self.image.path, 'rb') as disk:
return disk.read(len(self._get_stamp()))
@Tracer.trace
def run(self):
# Wait up to 5 seconds and then check the disk,
# ensuring that nobody else wrote to it.
time.sleep(self._rand_float(0, 5))
stamp = self._read_stamp()
assert(stamp == b'\0' * len(self._get_stamp()))
self._write_stamp()
stamp = self._read_stamp()
assert(stamp == self._get_stamp())
class TestRunner(object):
def __init__(self,
test_cls: typing.Type[RbdTest],
test_params: dict = {},
iterations: int = 1,
workers: int = 1,
stop_on_error: bool = False,
cleanup_on_error: bool = True):
self.test_cls = test_cls
self.test_params = test_params
self.iterations = iterations
self.workers = workers
self.executor = futures.ThreadPoolExecutor(max_workers=workers)
self.lock = threading.Lock()
self.completed = 0
self.errors = 0
self.stopped = False
self.stop_on_error = stop_on_error
self.cleanup_on_error = cleanup_on_error
@Tracer.trace
def run(self):
tasks = []
for i in range(self.iterations):
task = self.executor.submit(self.run_single_test)
tasks.append(task)
LOG.info("Waiting for %d tests to complete.", self.iterations)
for task in tasks:
task.result()
def run_single_test(self):
failed = False
if self.stopped:
return
try:
test = self.test_cls(**self.test_params)
test.initialize()
test.run()
except KeyboardInterrupt:
LOG.warning("Received Ctrl-C.")
self.stopped = True
except Exception as ex:
failed = True
if self.stop_on_error:
self.stopped = True
with self.lock:
self.errors += 1
LOG.exception(
"Test exception: %s. Total exceptions: %d",
ex, self.errors)
finally:
if not failed or self.cleanup_on_error:
try:
test.cleanup()
except KeyboardInterrupt:
LOG.warning("Received Ctrl-C.")
self.stopped = True
# Retry the cleanup
test.cleanup()
except Exception:
LOG.exception("Test cleanup failed.")
with self.lock:
self.completed += 1
LOG.info("Completed tests: %d. Pending: %d",
self.completed, self.iterations - self.completed)
TESTS: typing.Dict[str, typing.Type[RbdTest]] = {
'RbdTest': RbdTest,
'RbdFioTest': RbdFioTest,
'RbdStampTest': RbdStampTest
}
if __name__ == '__main__':
args = parser.parse_args()
log_level = logging.WARNING
if args.verbose:
log_level = logging.INFO
if args.debug:
log_level = logging.DEBUG
setup_logging(log_level)
test_params = dict(
image_size_mb=args.image_size_mb,
image_prefix=args.image_prefix,
bs=args.bs,
op=args.op,
iodepth=args.fio_depth,
map_timeout=args.map_timeout
)
try:
test_cls = TESTS[args.test_name]
except KeyError:
raise CephTestException("Unkown test: {}".format(args.test_name))
runner = TestRunner(
test_cls,
test_params=test_params,
iterations=args.iterations,
workers=args.concurrency,
stop_on_error=args.stop_on_error,
cleanup_on_error=not args.skip_cleanup_on_error)
runner.run()
Tracer.print_results()
test_cls.print_results(
description="count: %d, concurrency: %d" %
(args.iterations, args.concurrency))
assert runner.errors == 0, f"encountered {runner.errors} error(s)."