establish only 1 rados connection (#235)
This commit is contained in:
parent
46b06f317f
commit
b1548c0d96
7
main.go
7
main.go
|
@ -120,12 +120,17 @@ func main() {
|
|||
}
|
||||
|
||||
for _, cluster := range clusterConfigs {
|
||||
conn := rados.NewRadosConn(
|
||||
conn, err := rados.NewRadosConn(
|
||||
cluster.User,
|
||||
cluster.ConfigFile,
|
||||
*cephRadosOpTimeout,
|
||||
logger)
|
||||
|
||||
if err != nil {
|
||||
logger.WithError(err).WithField("cluster", cluster.ClusterLabel).Error("unable to create rados connection for cluster")
|
||||
continue
|
||||
}
|
||||
|
||||
prometheus.MustRegister(ceph.NewExporter(
|
||||
conn,
|
||||
cluster.ClusterLabel,
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
// that talks to a real Ceph cluster.
|
||||
type RadosConn struct {
|
||||
user string
|
||||
conn *rados.Conn
|
||||
configFile string
|
||||
timeout time.Duration
|
||||
logger *logrus.Logger
|
||||
|
@ -41,28 +42,34 @@ var _ ceph.Conn = &RadosConn{}
|
|||
// NewRadosConn returns a new RadosConn. Unlike the native rados.Conn, there
|
||||
// is no need to manage the connection before/after talking to the rados; it
|
||||
// is the responsibility of this *RadosConn to manage the connection.
|
||||
func NewRadosConn(user, configFile string, timeout time.Duration, logger *logrus.Logger) *RadosConn {
|
||||
return &RadosConn{
|
||||
func NewRadosConn(user, configFile string, timeout time.Duration, logger *logrus.Logger) (*RadosConn, error) {
|
||||
rc := &RadosConn{
|
||||
user: user,
|
||||
configFile: configFile,
|
||||
timeout: timeout,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
if err := rc.establishConn(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
// newRadosConn creates an established rados connection to the Ceph cluster
|
||||
// using the provided Ceph user and configFile. Ceph parameters
|
||||
// rados_osd_op_timeout and rados_mon_op_timeout are specified by the timeout
|
||||
// value, where 0 means no limit.
|
||||
func (c *RadosConn) newRadosConn() (*rados.Conn, error) {
|
||||
func (c *RadosConn) establishConn() error {
|
||||
conn, err := rados.NewConnWithUser(c.user)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating rados connection: %s", err)
|
||||
return fmt.Errorf("error creating rados connection: %s", err)
|
||||
}
|
||||
|
||||
err = conn.ReadConfigFile(c.configFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading config file: %s", err)
|
||||
return fmt.Errorf("error reading config file: %s", err)
|
||||
}
|
||||
|
||||
tv := strconv.FormatFloat(c.timeout.Seconds(), 'f', -1, 64)
|
||||
|
@ -72,17 +79,17 @@ func (c *RadosConn) newRadosConn() (*rados.Conn, error) {
|
|||
// https://github.com/ceph/ceph/blob/d4872ce97a2825afcb58876559cc73aaa1862c0f/src/common/legacy_config_opts.h#L1258-L1259
|
||||
err = conn.SetConfigOption("rados_osd_op_timeout", tv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error setting rados_osd_op_timeout: %s", err)
|
||||
return fmt.Errorf("error setting rados_osd_op_timeout: %s", err)
|
||||
}
|
||||
|
||||
err = conn.SetConfigOption("rados_mon_op_timeout", tv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error setting rados_mon_op_timeout: %s", err)
|
||||
return fmt.Errorf("error setting rados_mon_op_timeout: %s", err)
|
||||
}
|
||||
|
||||
err = conn.SetConfigOption("client_mount_timeout", tv)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error setting client_mount_timeout: %s", err)
|
||||
return fmt.Errorf("error setting client_mount_timeout: %s", err)
|
||||
}
|
||||
|
||||
// Ceph may retry the connection up to 10 times internally, which essentially makes client_mount_timeout 10x longer.
|
||||
|
@ -96,33 +103,23 @@ func (c *RadosConn) newRadosConn() (*rados.Conn, error) {
|
|||
select {
|
||||
case err = <-ch:
|
||||
case <-time.After(c.timeout):
|
||||
return nil, fmt.Errorf("error connecting to rados: timeout")
|
||||
return fmt.Errorf("error connecting to rados: timeout")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error connecting to rados: %s", err)
|
||||
return fmt.Errorf("error connecting to rados: %s", err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
// MonCommand executes a monitor command to rados.
|
||||
func (c *RadosConn) MonCommand(args []byte) (buffer []byte, info string, err error) {
|
||||
ll := c.logger.WithField("args", string(args))
|
||||
|
||||
ll.Trace("creating rados connection to execute mon command")
|
||||
|
||||
conn, err := c.newRadosConn()
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
defer conn.Shutdown()
|
||||
|
||||
ll = ll.WithField("conn", conn.GetInstanceID())
|
||||
|
||||
ll := c.logger.WithField("args", string(args)).WithField("conn", c.conn.GetInstanceID())
|
||||
ll.Trace("start executing mon command")
|
||||
|
||||
buffer, info, err = conn.MonCommand(args)
|
||||
buffer, info, err = c.conn.MonCommand(args)
|
||||
|
||||
ll.WithError(err).Trace("complete executing mon command")
|
||||
|
||||
|
@ -131,21 +128,10 @@ func (c *RadosConn) MonCommand(args []byte) (buffer []byte, info string, err err
|
|||
|
||||
// MgrCommand executes a manager command to rados.
|
||||
func (c *RadosConn) MgrCommand(args [][]byte) (buffer []byte, info string, err error) {
|
||||
ll := c.logger.WithField("args", string(bytes.Join(args, []byte(","))))
|
||||
|
||||
ll.Trace("creating rados connection to execute mgr command")
|
||||
|
||||
conn, err := c.newRadosConn()
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
defer conn.Shutdown()
|
||||
|
||||
ll = ll.WithField("conn", conn.GetInstanceID())
|
||||
|
||||
ll := c.logger.WithField("args", string(bytes.Join(args, []byte(",")))).WithField("conn", c.conn.GetInstanceID())
|
||||
ll.Trace("start executing mgr command")
|
||||
|
||||
buffer, info, err = conn.MgrCommand(args)
|
||||
buffer, info, err = c.conn.MgrCommand(args)
|
||||
|
||||
ll.WithError(err).Trace("complete executing mgr command")
|
||||
|
||||
|
@ -154,21 +140,10 @@ func (c *RadosConn) MgrCommand(args [][]byte) (buffer []byte, info string, err e
|
|||
|
||||
// GetPoolStats returns the count of unfound objects for the given rados pool.
|
||||
func (c *RadosConn) GetPoolStats(pool string) (*ceph.PoolStat, error) {
|
||||
ll := c.logger.WithField("pool", pool)
|
||||
|
||||
ll.Trace("creating rados connection to get pool stats")
|
||||
|
||||
conn, err := c.newRadosConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Shutdown()
|
||||
|
||||
ll = ll.WithField("conn", conn.GetInstanceID())
|
||||
|
||||
ll := c.logger.WithField("pool", pool).WithField("conn", c.conn.GetInstanceID())
|
||||
ll.Trace("opening IOContext for pool")
|
||||
|
||||
ioCtx, err := conn.OpenIOContext(pool)
|
||||
ioCtx, err := c.conn.OpenIOContext(pool)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue