mirror of
https://github.com/ceph/ceph
synced 2025-04-01 23:02:17 +00:00
mgr/dashboard: Replace IP address validation with Python standard library functions
Instead of self-written validation methods to validate IPv4 and IPv6 addresses. Use Python's standard library functions `ipaddress`. Signed-off-by: Ashish Singh <assingh@redhat.com>
This commit is contained in:
parent
7cbbe9acea
commit
aeb1c11334
@ -473,6 +473,7 @@ Group: System/Filesystems
|
|||||||
Requires: ceph-mgr = %{_epoch_prefix}%{version}-%{release}
|
Requires: ceph-mgr = %{_epoch_prefix}%{version}-%{release}
|
||||||
%if 0%{?fedora} || 0%{?rhel}
|
%if 0%{?fedora} || 0%{?rhel}
|
||||||
Requires: python%{_python_buildid}-cherrypy
|
Requires: python%{_python_buildid}-cherrypy
|
||||||
|
Requires: python%{_python_buildid}-ipaddress
|
||||||
Requires: python%{_python_buildid}-jwt
|
Requires: python%{_python_buildid}-jwt
|
||||||
Requires: python%{_python_buildid}-routes
|
Requires: python%{_python_buildid}-routes
|
||||||
Requires: python%{_python_buildid}-werkzeug
|
Requires: python%{_python_buildid}-werkzeug
|
||||||
|
1
debian/control
vendored
1
debian/control
vendored
@ -208,6 +208,7 @@ Package: ceph-mgr-dashboard
|
|||||||
Architecture: all
|
Architecture: all
|
||||||
Depends: ceph-mgr (= ${binary:Version}),
|
Depends: ceph-mgr (= ${binary:Version}),
|
||||||
python-cherrypy3,
|
python-cherrypy3,
|
||||||
|
python-ipaddress,
|
||||||
python-jwt,
|
python-jwt,
|
||||||
python-openssl,
|
python-openssl,
|
||||||
python-bcrypt,
|
python-bcrypt,
|
||||||
|
@ -2,11 +2,13 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
import ipaddress
|
||||||
from distutils.util import strtobool
|
from distutils.util import strtobool
|
||||||
|
import six
|
||||||
from ..awsauth import S3Auth
|
from ..awsauth import S3Auth
|
||||||
from ..settings import Settings, Options
|
from ..settings import Settings, Options
|
||||||
from ..rest_client import RestClient, RequestException
|
from ..rest_client import RestClient, RequestException
|
||||||
from ..tools import build_url, dict_contains_path, is_valid_ip_address
|
from ..tools import build_url, dict_contains_path
|
||||||
from .. import mgr, logger
|
from .. import mgr, logger
|
||||||
|
|
||||||
|
|
||||||
@ -125,9 +127,11 @@ def _parse_addr(value):
|
|||||||
# Group 1: [
|
# Group 1: [
|
||||||
# Group 2: 2001:db8:85a3::8a2e:370:7334
|
# Group 2: 2001:db8:85a3::8a2e:370:7334
|
||||||
addr = match.group(3) if match.group(3) else match.group(2)
|
addr = match.group(3) if match.group(3) else match.group(2)
|
||||||
if not is_valid_ip_address(addr):
|
try:
|
||||||
|
ipaddress.ip_address(six.u(addr))
|
||||||
|
return addr
|
||||||
|
except ValueError:
|
||||||
raise LookupError('Invalid RGW address \'{}\' found'.format(addr))
|
raise LookupError('Invalid RGW address \'{}\' found'.format(addr))
|
||||||
return addr
|
|
||||||
raise LookupError('Failed to determine RGW address')
|
raise LookupError('Failed to determine RGW address')
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,8 +11,7 @@ from . import ControllerTestCase
|
|||||||
from ..services.exception import handle_rados_error
|
from ..services.exception import handle_rados_error
|
||||||
from ..controllers import RESTController, ApiController, Controller, \
|
from ..controllers import RESTController, ApiController, Controller, \
|
||||||
BaseController, Proxy
|
BaseController, Proxy
|
||||||
from ..tools import is_valid_ipv6_address, dict_contains_path, \
|
from ..tools import dict_contains_path, RequestLoggingTool
|
||||||
RequestLoggingTool
|
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=W0613
|
# pylint: disable=W0613
|
||||||
@ -172,14 +171,6 @@ class RequestLoggingToolTest(ControllerTestCase):
|
|||||||
|
|
||||||
class TestFunctions(unittest.TestCase):
|
class TestFunctions(unittest.TestCase):
|
||||||
|
|
||||||
def test_is_valid_ipv6_address(self):
|
|
||||||
self.assertTrue(is_valid_ipv6_address('::'))
|
|
||||||
self.assertTrue(is_valid_ipv6_address('::1'))
|
|
||||||
self.assertFalse(is_valid_ipv6_address('127.0.0.1'))
|
|
||||||
self.assertFalse(is_valid_ipv6_address('localhost'))
|
|
||||||
self.assertTrue(is_valid_ipv6_address('1200:0000:AB00:1234:0000:2552:7777:1313'))
|
|
||||||
self.assertFalse(is_valid_ipv6_address('1200::AB00:1234::2552:7777:1313'))
|
|
||||||
|
|
||||||
def test_dict_contains_path(self):
|
def test_dict_contains_path(self):
|
||||||
x = {'a': {'b': {'c': 'foo'}}}
|
x = {'a': {'b': {'c': 'foo'}}}
|
||||||
self.assertTrue(dict_contains_path(x, ['a', 'b', 'c']))
|
self.assertTrue(dict_contains_path(x, ['a', 'b', 'c']))
|
||||||
|
@ -5,6 +5,7 @@ import sys
|
|||||||
import inspect
|
import inspect
|
||||||
import json
|
import json
|
||||||
import functools
|
import functools
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
@ -12,7 +13,7 @@ from distutils.util import strtobool
|
|||||||
import fnmatch
|
import fnmatch
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
import socket
|
import six
|
||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
import cherrypy
|
import cherrypy
|
||||||
|
|
||||||
@ -646,110 +647,6 @@ class Task(object):
|
|||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
|
||||||
|
|
||||||
def is_valid_ip_address(addr):
|
|
||||||
"""
|
|
||||||
Validate the given IPv4 or IPv6 address.
|
|
||||||
|
|
||||||
>>> is_valid_ip_address('2001:0db8::1234')
|
|
||||||
True
|
|
||||||
|
|
||||||
>>> is_valid_ip_address('192.168.121.1')
|
|
||||||
True
|
|
||||||
|
|
||||||
>>> is_valid_ip_address('1:::1')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ip_address('8.1.0')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ip_address('260.1.0.1')
|
|
||||||
False
|
|
||||||
|
|
||||||
:param addr:
|
|
||||||
:type addr: str
|
|
||||||
:return: Returns ``True`` if the IP address is valid,
|
|
||||||
otherwise ``False``.
|
|
||||||
:rtype: bool
|
|
||||||
"""
|
|
||||||
return is_valid_ipv4_address(addr) or is_valid_ipv6_address(addr)
|
|
||||||
|
|
||||||
|
|
||||||
def is_valid_ipv4_address(addr):
|
|
||||||
"""
|
|
||||||
Validate the given IPv4 address.
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address('0.0.0.0')
|
|
||||||
True
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address('192.168.121.1')
|
|
||||||
True
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address('a.b.c.d')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address('172.1.0.a')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address('2001:0db8::1234')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address(None)
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv4_address(123456)
|
|
||||||
False
|
|
||||||
|
|
||||||
:param addr:
|
|
||||||
:type addr: str
|
|
||||||
:return: Returns ``True`` if the IPv4 address is valid,
|
|
||||||
otherwise ``False``.
|
|
||||||
:rtype: bool
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
socket.inet_pton(socket.AF_INET, addr)
|
|
||||||
return True
|
|
||||||
except (socket.error, TypeError):
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def is_valid_ipv6_address(addr):
|
|
||||||
"""
|
|
||||||
Validate the given IPv6 address.
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address('2001:0db8::1234')
|
|
||||||
True
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address('fe80::bc6c:66b0:5af8:f44')
|
|
||||||
True
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address('192.168.121.1')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address('a:x::1')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address('1200:0000:AB00:1234:O000:2552:7777:1313')
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address(None)
|
|
||||||
False
|
|
||||||
|
|
||||||
>>> is_valid_ipv6_address(123456)
|
|
||||||
False
|
|
||||||
|
|
||||||
:param addr:
|
|
||||||
:type addr: str
|
|
||||||
:return: Returns ``True`` if the IPv6 address is valid,
|
|
||||||
otherwise ``False``.
|
|
||||||
:rtype: bool
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
socket.inet_pton(socket.AF_INET6, addr)
|
|
||||||
return True
|
|
||||||
except (socket.error, TypeError):
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def build_url(host, scheme=None, port=None):
|
def build_url(host, scheme=None, port=None):
|
||||||
"""
|
"""
|
||||||
Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
|
Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
|
||||||
@ -772,7 +669,11 @@ def build_url(host, scheme=None, port=None):
|
|||||||
:type port: int
|
:type port: int
|
||||||
:rtype: str
|
:rtype: str
|
||||||
"""
|
"""
|
||||||
netloc = host if not is_valid_ipv6_address(host) else '[{}]'.format(host)
|
try:
|
||||||
|
ipaddress.IPv6Address(six.u(host))
|
||||||
|
netloc = '[{}]'.format(host)
|
||||||
|
except ValueError:
|
||||||
|
netloc = host
|
||||||
if port:
|
if port:
|
||||||
netloc += ':{}'.format(port)
|
netloc += ':{}'.format(port)
|
||||||
pr = urllib.parse.ParseResult(
|
pr = urllib.parse.ParseResult(
|
||||||
|
Loading…
Reference in New Issue
Block a user