Merge branch 'mars0.1.y' into mars0.1a.y

This commit is contained in:
Thomas Schoebel-Theuer 2018-02-01 06:25:02 +01:00
commit 1022c21ac6
20 changed files with 1110 additions and 342 deletions

View File

@ -232,6 +232,21 @@ mars0.1abeta0
-----------------------------------
Changelog for series 0.1:
mars0.1stable53
* Major fix: rare corner case of split brain was not displayed
correctly.
* Major usablilty: show amount of data during split brain.
This hints the sysadmins about the size of future data loss
at later split brain resolution.
* Minor workaround: crashed /mars filesystems may contain
completely damaged symlinks with timestamps in the far
distant future, e.g. year >3000 etc. Safeguard unusual
Lamport time slips by ignoring implausible values.
* Major improvement: internal locking overhead reduced.
* Minor improvment: reduce message trigger overhead.
* Several minor improvements.
* Doc updates.
mars0.1stable52
* Major contrib: new example scripts for MARS background data
migration during production. 1&1-specific code in a separate

View File

@ -142,7 +142,7 @@ tst@1und1.de
\end_layout
\begin_layout Date
Version 0.1a-2
Version 0.1a-5
\end_layout
\begin_layout Lowertitleback
@ -336,7 +336,7 @@ name "chap:Why-You-should"
\end_layout
\begin_layout Section
Cost Arguments from Architecture
Cost and Scalability Arguments from Architecture
\end_layout
\begin_layout Standard
@ -414,11 +414,17 @@ any
\end_inset
network connections running in parallel.
Even if the total network throughput would be scaling only with
Even if the total network throughput is scaling only with
\begin_inset Formula $O(n)$
\end_inset
, the network has to
, nevertheless
\begin_inset Formula $O(n^{2})$
\end_inset
network connections have to be maintained at connection oriented protocols
and at various layers of the operating software, and the network has to
\emph on
switch
\emph default
@ -452,7 +458,7 @@ In order to limit error propagation from other networks, the storage network
\emph on
physically separate
\emph default
/
=
\emph on
dedicated
\emph default
@ -466,8 +472,8 @@ Because storage networks are heavily reacting to high latencies and packet
\series bold
worst case
\series default
(load peaks, packet storms, etc), needing one of the best = most expensive
components for reducing latency and increasing throughput.
(load peaks, packet storms, etc), needing one of the best = typically most
expensive components for reducing latency and increasing throughput.
Dimensioning to the worst case instead of an average case plus some safety
margins is nothing but an expensive
\series bold
@ -504,15 +510,15 @@ sharding architecture
\begin_inset Quotes erd
\end_inset
which does not need a storage network at all, at least when built and dimension
ed properly.
which does not need a dedicated storage network at all, at least when built
and dimensioned properly.
Instead, it
\emph on
should have
\emph default
(but not always needs) a so-called replication network which can, when
present, be dimensioned much smaller because it does neither need realtime
operations, nor scalabiliy to
operations nor scalabiliy to
\begin_inset Formula $O(n^{2})$
\end_inset
@ -562,12 +568,7 @@ Even in cases when any customer may potentially access any of the data items
Only when partitioning of input traffic plus data is not possible in a reasonabl
e way, big cluster architectures as implemented for example in Ceph or Swift
(and partly even possible with MARS when resticted to the block layer)
have their
\series bold
usecase
\series default
.
Only under such a precondition they are really needed.
have a very clear use case.
\end_layout
\begin_layout Standard
@ -585,7 +586,7 @@ When sharding is possible, it is the preferred model due to cost and performance
\end_inset
Notice that MARS' new remote device feature from the 0.2 branch series (which
is a replacement for iSCSI)
is kind of replacement for iSCSI)
\emph on
could
\emph default
@ -601,7 +602,15 @@ big cluster
\end_layout
\begin_layout Standard
Nevertheless, this sub-variant is not the preferred model.
Nevertheless, such models re-introducing some kind of
\begin_inset Quotes eld
\end_inset
dedicated storage network
\begin_inset Quotes erd
\end_inset
into MARS operations are not the preferred model.
Following is the a super-model which combines both the
\begin_inset Quotes eld
\end_inset
@ -610,7 +619,7 @@ big cluster
\begin_inset Quotes erd
\end_inset
and sharding model at block lyer in a very flexible way.
and sharding model at block layer in a very flexible way.
The following example shows only two servers from a pool consisting of
hundreds or thousands of servers:
\begin_inset Separator latexpar
@ -673,8 +682,8 @@ exceptionally(!)
\end_layout
\begin_layout Standard
Notice that locally running VMs doesn't produce any storage network traffic
at all.
Notice that local operations of VMs doesn't produce any storage network
traffic at all.
Therefore, this is the preferred runtime configuration.
\end_layout
@ -685,7 +694,7 @@ Only in cases of resource imbalance, such as (transient) CPU or RAM peaks
\emph on
some
\emph default
containers may be run somewhere else over the network.
VMs or containers may be run somewhere else over the network.
In a well-balanced and well-dimensioned system, this will be the
\series bold
vast minority
@ -739,23 +748,12 @@ reference "sec:Cost-Arguments-from"
\begin_layout Standard
The sharding model needs a different approach to load balancing of storage
space than the big cluster model.
There are serveral possibilities at different layers:
\end_layout
\begin_layout Itemize
Dynamically growing the sizes of LVs via
\family typewriter
lvresize
\family default
followed by
\family typewriter
marsadm resize
\family default
followed by
\family typewriter
xfs_growfs
\family default
or similar operations.
There are serveral possibilities at different layers, each addressing different
\series bold
granularities
\series default
:
\end_layout
\begin_layout Itemize
@ -768,6 +766,51 @@ rsync
mysqldump
\family default
or similar.
\begin_inset Newline newline
\end_inset
Example: at 1&1 Shared Hosting Linux, we have about 9 millions of customer
home directories.
We also have a script
\family typewriter
movespace.pl
\family default
using incremental
\family typewriter
tar
\family default
for their moves.
Now, if we would try to move around
\emph on
all
\emph default
of them this way, it could easily take years or even decades for millions
of extremely small home directories, due to overhead like DNS updates etc.
However, there exist a small handful of large customer home directories
in the terabyte range.
For these, and only for these, it is a clever idea to use
\family typewriter
movespace.pl
\family default
because thereby the size of a LV can be regulated more fine grained than
at LV level.
\end_layout
\begin_layout Itemize
Dynamically growing the sizes of LVs during operations:
\family typewriter
lvresize
\family default
followed by
\family typewriter
marsadm resize
\family default
followed by
\family typewriter
xfs_growfs
\family default
or similar operations.
\end_layout
\begin_layout Itemize
@ -792,7 +835,11 @@ The idea is to dynamically create
\emph on
additional
\emph default
LV replicas for the sake of background migration.
LV replicas for the sake of
\series bold
background migration
\series default
.
Examples:
\end_layout
@ -888,6 +935,18 @@ In case you already have a redundant LV copy somewhere, you should run a
\end_inset
when moving pairs at once.
\begin_inset Newline newline
\end_inset
Example: see
\family typewriter
tetris.sh
\family default
in the
\family typewriter
contrib/
\family default
directory of MARS.
\end_layout
\begin_layout Itemize
@ -1242,7 +1301,7 @@ Cloud storage is way too much hyped.
\emph on
extremely
\emph default
varying over time, and when you need some
varying over time, and/or when you need some
\emph on
extra
\emph default
@ -1591,9 +1650,10 @@ practice
\begin_layout Standard
Therefore a big cluster typically needs >200% disks to reach the same net
capacity as a sharded cluster, where typically hardware RAID-60 with a
significantly smaller overhead is sufficient for providing sufficient failure
tolerance at disk level.
capacity as a simple sharded cluster.
The latter can take advantage of hardware RAID-60 with a significantly
smaller disk overhead, while providing sufficient failure tolerance at
disk level.
\end_layout
\begin_layout Standard
@ -5395,7 +5455,8 @@ marsadm primary
\family typewriter
marsadm up
\family default
), and that any old primary (in this case
), that you are no sync target anymore, and that any old primary (in this
case
\family typewriter
A
\family default
@ -5425,8 +5486,8 @@ marsadm primary
\end_layout
\begin_layout Standard
The preconditions try to protect you from doing silly things, such as accidental
ly provoking a split brain error state.
These preconditions try to protect you from doing silly things, such as
accidentally provoking a split brain error state.
We try to avoid split brain as best as we can.
Therefore, we distinguish between
\emph on
@ -5594,8 +5655,9 @@ current
last known
\emph default
state).
The following command sequence will skip many checks and tell your node
to become primary forcefully:
The following command sequence will skip many checks (essentially you just
need to be attached and you must not be a current sync target) and tell
your node to become primary forcefully:
\end_layout
\begin_layout Itemize
@ -6117,6 +6179,92 @@ marsadm down
primary.
\end_layout
\begin_layout Standard
\begin_inset Graphics
filename images/MatieresToxiques.png
lyxscale 50
scale 17
\end_inset
\family typewriter
marsadm primary force
\family default
is rejected in newer
\begin_inset Foot
status open
\begin_layout Plain Layout
Beware: older versions before
\family typewriter
mars0.1stable52
\family default
did deliberately skip this check because a few years ago somebody at 1&1
did place a
\emph on
requirement
\emph default
on this.
Fortunately, the requirement now has gone, so a more safe behaviour could
be implemented.
The new behaviour is for your safety, to prevent you from doing
\begin_inset Quotes eld
\end_inset
silly
\begin_inset Quotes erd
\end_inset
things in case you are under pressure during an incident (try to safeguard
human error as best as possible).
\end_layout
\end_inset
marsadm versions if your replica is a current sync target.
This is not a bug: it should prevent you from forcing an inconsistent replica
into primary mode, which will
\emph on
certainly
\emph default
lead to inconsistent data.
However, in extreme rare cases of severe damage of
\emph on
all
\emph default
of your replicas, you may be desperate.
Only in such a rare case, and only then, you might decide to force any
of your replicas (e.g.
based on their last sync progress bar) into primary role although none
of the re-syncs had finished before.
In such a case, and only if you really know what you are doing, you may
use
\family typewriter
marsadm fake-sync
\family default
to first mark your inconsisten replica as UpToDate (which is a
\series bold
lie
\series default
) and then force it to primary as explained above.
Afterwards, you will certainly need an
\family typewriter
fsck
\family default
or similar repair before you can restart your application.
Good luck! And don't forget to check the size of
\family typewriter
lost+found
\family default
afterwards.
This is really your
\emph on
very last
\emph default
chance if nothing else had succeeded before.
\end_layout
\begin_layout Subsection
Split Brain Resolution
\begin_inset CommandInset label
@ -10705,6 +10853,24 @@ marsadm view mydata
\begin_layout Labeling
\labelwidthstring 00.00.0000
\family typewriter
[
\emph on
this_count
\emph default
/
\emph on
total_count
\emph default
]
\family default
total number of replicas of this resource, out of total number of cluster
members.
\end_layout
\begin_layout Labeling
\labelwidthstring 00.00.0000
\family typewriter
%include{diskstate}
\family default
@ -11633,7 +11799,7 @@ mars_light
\family typewriter
%{window}
\family default
(default 30) seconds.
(default 60) seconds.
Notice that this may happen when deleting
\emph on
extremely
@ -11873,7 +12039,7 @@ PrimaryUnreachable
%is-alive{}
\family default
) A current designated primary has been set, but this host has not been
remotely updated for more than 30 seconds (see also
remotely updated for more than 60 seconds (see also
\family typewriter
--window=$seconds
\family default
@ -16204,6 +16370,15 @@ reference "sec:The-Lamport-Clock"
\begin_layout Itemize
\family typewriter
%real-time{}
\family default
Return the current system clock timestamp, in units of seconds since the
Unix epoch.
\end_layout
\begin_layout Itemize
\family typewriter
%sleep{
\emph on
@ -16495,7 +16670,7 @@ The value given by the
\family typewriter
--window=
\family default
option, or the corresonding default value.
option, or the corresonding default value (60s).
\end_layout
\begin_layout Itemize
@ -17421,7 +17596,7 @@ status open
The time window for checking the aliveness of other nodes in the network.
When no symlink updates have occurred during the last window, the node
is considered dead.
Default is 30s
Default is 60s.
\end_layout
\end_inset
@ -17912,7 +18087,7 @@ name "sec:Cluster-Operations"
\size scriptsize
\begin_inset Tabular
<lyxtabular version="3" rows="6" columns="3">
<lyxtabular version="3" rows="9" columns="3">
<features islongtable="true" longtabularalignment="left">
<column alignment="left" valignment="top" width="0pt">
<column alignment="center" valignment="top">
@ -18613,6 +18788,419 @@ mkfs.ext4
\end_inset
\end_layout
\end_inset
</cell>
</row>
<row>
<cell alignment="center" valignment="top" topline="true" leftline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\family typewriter
\size scriptsize
\begin_inset Box Frameless
position "t"
hor_pos "c"
has_inner_box 1
inner_pos "t"
use_parbox 0
use_makebox 0
width "20col%"
special "none"
height "1in"
height_special "totalheight"
thickness "0.4pt"
separation "3pt"
shadowsize "4pt"
framecolor "black"
backgroundcolor "none"
status open
\begin_layout Plain Layout
\family typewriter
\size scriptsize
merge-cluster
\begin_inset Newline newline
\end_inset
\begin_inset ERT
status open
\begin_layout Plain Layout
\backslash
strut
\backslash
hfill
\end_layout
\end_inset
$host
\end_layout
\end_inset
\end_layout
\end_inset
</cell>
<cell alignment="center" valignment="top" topline="true" leftline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\size scriptsize
no
\end_layout
\end_inset
</cell>
<cell alignment="center" valignment="top" topline="true" leftline="true" rightline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\size scriptsize
\begin_inset Box Frameless
position "t"
hor_pos "c"
has_inner_box 1
inner_pos "t"
use_parbox 0
use_makebox 0
width "60col%"
special "none"
height "1in"
height_special "totalheight"
thickness "0.4pt"
separation "3pt"
shadowsize "4pt"
framecolor "black"
backgroundcolor "none"
status open
\begin_layout Plain Layout
\size scriptsize
Precondition: the set of resources at the local cluster (transitively) and
at the cluster of
\family typewriter
$host
\family default
(transitively) must be disjoint.
\end_layout
\begin_layout Plain Layout
\size scriptsize
Create the union of both clusters, consisting of the union of all participating
machines (transitively).
Resource memberships are unaffected.
This is useful for creating a
\begin_inset Quotes eld
\end_inset
virtual LVM cluster
\begin_inset Quotes erd
\end_inset
where resources can be migrated later via
\family typewriter
join-resource
\family default
/
\family typewriter
leave-resource
\family default
operations.
\end_layout
\begin_layout Plain Layout
\begin_inset Graphics
filename images/MatieresCorrosives.png
lyxscale 50
scale 17
\end_inset
\size scriptsize
Attention! The mars branch
\family typewriter
0.1.y
\family default
does not scale well in number of cluter members, because it evolved from
a lab prototype with
\begin_inset Formula $O(n^{2})$
\end_inset
behaviour at metadata exchange.
Never exceed the maximum cluster members as described in appendix
\begin_inset CommandInset ref
LatexCommand vref
reference "chap:Technical-Data-MARS"
\end_inset
.
For safety, you should better stay at 1/2 of the numbers mentioned there.
Use
\family typewriter
split-cluster
\family default
for going back to smaller clusters again after your background data migration
has completed.
\end_layout
\begin_layout Plain Layout
\begin_inset Graphics
filename images/lightbulb_brightlit_benj_.png
lyxscale 12
scale 7
\end_inset
\size scriptsize
Future versions of MARS, starting with branch
\family typewriter
0.1b.y
\family default
will be constructed for very big clusters in the range of thousands of
nodes.
Development has not yet stabilized there, and operational experiences are
missing at the moment.
Be careful until official announcements are appearing in the ChangeLog,
reporting of operational experiences from the 1&1 big cluster at metadata
level.
\end_layout
\end_inset
\end_layout
\end_inset
</cell>
</row>
<row>
<cell alignment="center" valignment="top" topline="true" leftline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\family typewriter
\size scriptsize
\begin_inset Box Frameless
position "t"
hor_pos "c"
has_inner_box 1
inner_pos "t"
use_parbox 0
use_makebox 0
width "20col%"
special "none"
height "1in"
height_special "totalheight"
thickness "0.4pt"
separation "3pt"
shadowsize "4pt"
framecolor "black"
backgroundcolor "none"
status open
\begin_layout Plain Layout
\family typewriter
\size scriptsize
merge-cluster-check
\begin_inset Newline newline
\end_inset
\begin_inset ERT
status open
\begin_layout Plain Layout
\backslash
strut
\backslash
hfill
\end_layout
\end_inset
$host
\end_layout
\end_inset
\end_layout
\end_inset
</cell>
<cell alignment="center" valignment="top" topline="true" leftline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\size scriptsize
no
\end_layout
\end_inset
</cell>
<cell alignment="center" valignment="top" topline="true" leftline="true" rightline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\size scriptsize
\begin_inset Box Frameless
position "t"
hor_pos "c"
has_inner_box 1
inner_pos "t"
use_parbox 0
use_makebox 0
width "60col%"
special "none"
height "1in"
height_special "totalheight"
thickness "0.4pt"
separation "3pt"
shadowsize "4pt"
framecolor "black"
backgroundcolor "none"
status open
\begin_layout Plain Layout
\size scriptsize
Check in advance whether the set of resources at the local cluster and at
the other cluster
\family typewriter
$host
\family default
are disjoint.
\end_layout
\end_inset
\end_layout
\end_inset
</cell>
</row>
<row>
<cell alignment="center" valignment="top" topline="true" leftline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\family typewriter
\size scriptsize
\begin_inset Box Frameless
position "t"
hor_pos "c"
has_inner_box 1
inner_pos "t"
use_parbox 0
use_makebox 0
width "20col%"
special "none"
height "1in"
height_special "totalheight"
thickness "0.4pt"
separation "3pt"
shadowsize "4pt"
framecolor "black"
backgroundcolor "none"
status open
\begin_layout Plain Layout
\family typewriter
\size scriptsize
split-cluster
\end_layout
\end_inset
\end_layout
\end_inset
</cell>
<cell alignment="center" valignment="top" topline="true" leftline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\size scriptsize
no
\end_layout
\end_inset
</cell>
<cell alignment="center" valignment="top" topline="true" leftline="true" rightline="true" usebox="none">
\begin_inset Text
\begin_layout Plain Layout
\size scriptsize
\begin_inset Box Frameless
position "t"
hor_pos "c"
has_inner_box 1
inner_pos "t"
use_parbox 0
use_makebox 0
width "60col%"
special "none"
height "1in"
height_special "totalheight"
thickness "0.4pt"
separation "3pt"
shadowsize "4pt"
framecolor "black"
backgroundcolor "none"
status open
\begin_layout Plain Layout
\size scriptsize
This is almost the inverse operation of
\family typewriter
merge-cluster
\family default
: it determines the minimum sub-cluster groups participating in some common
resources.
Then it splits the cluster memberships such that unnecessary connections
between non-related nodes are interrupted.
\end_layout
\begin_layout Plain Layout
\size scriptsize
Use this for avoidance of too big clusters.
\end_layout
\end_inset
\end_layout
\end_inset
@ -20242,6 +20830,7 @@ all
\end_layout
\begin_layout Standard
\noindent
\size scriptsize
\begin_inset Tabular
@ -24471,7 +25060,7 @@ pause-replay
primary --force
\family default
is a potentially harmful variant, because it will provoke a split brain
in many cases, and therefore in turn will lead to
in most cases, and therefore in turn will lead to
\series bold
data loss
\series default
@ -24795,6 +25384,20 @@ reference "subsec:Forced-Switching"
\end_inset
.
For your safety,
\family typewriter
force
\family default
does not work in newer marsadm (after mars0.1stable52) when your replica
is a current sync target.
More explanations see section
\begin_inset CommandInset ref
LatexCommand vref
reference "subsec:Forced-Switching"
\end_inset
.
\end_layout
@ -35377,7 +35980,14 @@ The Generic Object and Aspect Infrastructure
\begin_layout Chapter
\start_of_appendix
Technical Data MARS
Technical Data MARS
\begin_inset CommandInset label
LatexCommand label
name "chap:Technical-Data-MARS"
\end_inset
\end_layout
\begin_layout Standard

Binary file not shown.

View File

@ -127,7 +127,7 @@ EXPORT_SYMBOL_GPL(default_channel);
static struct say_channel *channel_list = NULL;
static rwlock_t say_lock = __RW_LOCK_UNLOCKED(say_lock);
static DECLARE_RWSEM(say_mutex);
static struct task_struct *say_thread = NULL;
@ -135,11 +135,17 @@ static DECLARE_WAIT_QUEUE_HEAD(say_event);
bool say_dirty = false;
static
bool cannot_schedule(void)
{
return (preempt_count() & (SOFTIRQ_MASK | HARDIRQ_MASK | NMI_MASK)) != 0 || in_atomic() || irqs_disabled();
}
static
void wait_channel(struct say_channel *ch, int class)
{
if (delay_say_on_overflow && ch->ch_index[class] > SAY_BUF_LIMIT) {
bool use_atomic = (preempt_count() & (SOFTIRQ_MASK | HARDIRQ_MASK | NMI_MASK)) != 0 || in_atomic() || irqs_disabled();
bool use_atomic = cannot_schedule();
if (!use_atomic) {
say_dirty = true;
wake_up_interruptible(&say_event);
@ -153,9 +159,11 @@ struct say_channel *find_channel(const void *id)
{
struct say_channel *res = default_channel;
struct say_channel *ch;
unsigned long flags;
read_lock_irqsave(&say_lock, flags);
if (cannot_schedule())
return res;
down_read(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
int i;
for (i = 0; i < ch->ch_id_max; i++) {
@ -166,7 +174,7 @@ struct say_channel *find_channel(const void *id)
}
}
found:
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
return res;
}
@ -188,9 +196,11 @@ void _remove_binding(struct task_struct *whom)
void bind_to_channel(struct say_channel *ch, struct task_struct *whom)
{
int i;
unsigned long flags;
write_lock_irqsave(&say_lock, flags);
if (cannot_schedule())
return;
down_write(&say_mutex);
_remove_binding(whom);
for (i = 0; i < ch->ch_id_max; i++) {
if (!ch->ch_ids[i]) {
@ -205,11 +215,11 @@ void bind_to_channel(struct say_channel *ch, struct task_struct *whom)
}
done:
write_unlock_irqrestore(&say_lock, flags);
up_write(&say_mutex);
return;
err:
write_unlock_irqrestore(&say_lock, flags);
up_write(&say_mutex);
say_to(default_channel, SAY_ERROR, "ID overflow for thread '%s'\n", whom->comm);
}
@ -219,9 +229,11 @@ struct say_channel *get_binding(struct task_struct *whom)
{
struct say_channel *ch;
int i;
unsigned long flags;
read_lock_irqsave(&say_lock, flags);
if (cannot_schedule())
return NULL;
down_read(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
for (i = 0; i < ch->ch_id_max; i++) {
if (ch->ch_ids[i] == whom) {
@ -231,7 +243,7 @@ struct say_channel *get_binding(struct task_struct *whom)
}
ch = NULL;
found:
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
return ch;
}
EXPORT_SYMBOL_GPL(get_binding);
@ -240,9 +252,11 @@ void remove_binding_from(struct say_channel *ch, struct task_struct *whom)
{
bool found = false;
int i;
unsigned long flags;
write_lock_irqsave(&say_lock, flags);
if (cannot_schedule())
return;
down_write(&say_mutex);
for (i = 0; i < ch->ch_id_max; i++) {
if (ch->ch_ids[i] == whom) {
ch->ch_ids[i] = NULL;
@ -253,17 +267,18 @@ void remove_binding_from(struct say_channel *ch, struct task_struct *whom)
if (!found) {
_remove_binding(whom);
}
write_unlock_irqrestore(&say_lock, flags);
up_write(&say_mutex);
}
EXPORT_SYMBOL_GPL(remove_binding_from);
void remove_binding(struct task_struct *whom)
{
unsigned long flags;
if (cannot_schedule())
return;
write_lock_irqsave(&say_lock, flags);
down_write(&say_mutex);
_remove_binding(whom);
write_unlock_irqrestore(&say_lock, flags);
up_write(&say_mutex);
}
EXPORT_SYMBOL_GPL(remove_binding);
@ -280,13 +295,15 @@ EXPORT_SYMBOL_GPL(rollover_channel);
void rollover_all(void)
{
struct say_channel *ch;
unsigned long flags;
read_lock_irqsave(&say_lock, flags);
if (cannot_schedule())
return;
down_read(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
ch->ch_rollover = true;
}
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
}
EXPORT_SYMBOL_GPL(rollover_all);
@ -309,19 +326,20 @@ void _del_channel(struct say_channel *ch)
struct say_channel *tmp;
struct say_channel **_tmp;
int i, j;
unsigned long flags;
if (!ch)
return;
if (cannot_schedule())
return;
write_lock_irqsave(&say_lock, flags);
down_write(&say_mutex);
for (_tmp = &channel_list; (tmp = *_tmp) != NULL; _tmp = &tmp->ch_next) {
if (tmp == ch) {
*_tmp = tmp->ch_next;
break;
}
}
write_unlock_irqrestore(&say_lock, flags);
up_write(&say_mutex);
for (i = 0; i < MAX_SAY_CLASS; i++) {
for (j = 0; j < 2; j++) {
@ -352,8 +370,6 @@ struct say_channel *_make_channel(const char *name, bool must_exist)
struct say_channel *res = NULL;
struct kstat kstat = {};
int i, j;
bool use_atomic = (preempt_count() & (SOFTIRQ_MASK | HARDIRQ_MASK | NMI_MASK)) != 0 || in_atomic() || irqs_disabled();
unsigned long mode = use_atomic ? GFP_ATOMIC : GFP_BRICK;
mm_segment_t oldfs;
bool is_dir = false;
int status;
@ -373,7 +389,7 @@ struct say_channel *_make_channel(const char *name, bool must_exist)
}
restart:
res = kzalloc(sizeof(struct say_channel), mode);
res = kzalloc(sizeof(struct say_channel), GFP_BRICK);
if (unlikely(!res)) {
cond_resched();
goto restart;
@ -383,7 +399,7 @@ restart:
res->ch_is_dir = is_dir;
init_waitqueue_head(&res->ch_progress);
restart2:
res->ch_name = kstrdup(name, mode);
res->ch_name = kstrdup(name, GFP_BRICK);
if (unlikely(!res->ch_name)) {
cond_resched();
goto restart2;
@ -394,7 +410,7 @@ restart2:
for (j = 0; j < 2; j++) {
char *buf;
restart3:
buf = (void*)__get_free_pages(mode, SAY_ORDER);
buf = (void*)__get_free_pages(GFP_BRICK, SAY_ORDER);
if (unlikely(!buf)) {
cond_resched();
goto restart3;
@ -411,23 +427,27 @@ struct say_channel *make_channel(const char *name, bool must_exist)
{
struct say_channel *res = NULL;
struct say_channel *ch;
unsigned long flags;
read_lock_irqsave(&say_lock, flags);
if (cannot_schedule()) {
printk(KERN_ERR "trying to make channel in atomic\n");
return NULL;
}
down_read(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
if (!strcmp(ch->ch_name, name)) {
res = ch;
break;
}
}
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
if (unlikely(!res)) {
res = _make_channel(name, must_exist);
if (unlikely(!res))
goto done;
write_lock_irqsave(&say_lock, flags);
down_write(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
if (ch != res && unlikely(!strcmp(ch->ch_name, name))) {
@ -441,7 +461,7 @@ struct say_channel *make_channel(const char *name, bool must_exist)
channel_list = res;
race_found:
write_unlock_irqrestore(&say_lock, flags);
up_write(&say_mutex);
}
done:
@ -508,7 +528,7 @@ void say_to(struct say_channel *ch, int class, const char *fmt, ...)
ch = find_channel(current);
}
if (likely(ch)) {
if (ch && ch != default_channel) {
if (!ch->ch_is_dir)
class = SAY_TOTAL;
if (likely(class >= 0 && class < MAX_SAY_CLASS)) {
@ -814,8 +834,6 @@ void treat_channel(struct say_channel *ch, int class)
static
int _say_thread(void *data)
{
unsigned long flags;
while (!kthread_should_stop()) {
struct say_channel *ch;
int i;
@ -824,36 +842,36 @@ int _say_thread(void *data)
say_dirty = false;
restart_rollover:
read_lock_irqsave(&say_lock, flags);
down_read(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
if (ch->ch_rollover && ch->ch_status_written > 0) {
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
_rollover_channel(ch);
goto restart_rollover;
}
}
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
restart:
read_lock_irqsave(&say_lock, flags);
down_read(&say_mutex);
for (ch = channel_list; ch; ch = ch->ch_next) {
int start = 0;
if (!ch->ch_is_dir)
start = SAY_TOTAL;
for (i = start; i < MAX_SAY_CLASS; i++) {
if (ch->ch_index[i] > 0) {
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
treat_channel(ch, i);
goto restart;
}
}
if (ch->ch_delete) {
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
_del_channel(ch);
goto restart;
}
}
read_unlock_irqrestore(&say_lock, flags);
up_read(&say_mutex);
}
return 0;

View File

@ -126,6 +126,8 @@ EXPORT_SYMBOL_GPL(get_lamport);
void set_lamport(struct timespec *lamport_old)
{
protect_timespec(lamport_old);
/* Always advance the internal Lamport timestamp a little bit
* in order to ensure strict monotonicity between set_lamport() calls.
*/
@ -140,6 +142,8 @@ EXPORT_SYMBOL_GPL(set_lamport);
void set_lamport_nonstrict(struct timespec *lamport_old)
{
protect_timespec(lamport_old);
/* Speculate that advaning is not necessary, to avoid the lock
*/
if (timespec_compare(lamport_old, &lamport_stamp) > 0) {
@ -159,6 +163,8 @@ void set_get_lamport(struct timespec *lamport_old, struct timespec *real_now, st
{
struct timespec _real_now;
protect_timespec(lamport_old);
down_write(&lamport_sem);
if (timespec_compare(lamport_old, &lamport_stamp) > 0)
*lamport_now = *lamport_old;
@ -175,3 +181,26 @@ void set_get_lamport(struct timespec *lamport_old, struct timespec *real_now, st
*lamport_now = _real_now;
}
EXPORT_SYMBOL_GPL(set_get_lamport);
/* Protect against illegal values, e.g. from currupt filesystems etc.
*/
int max_lamport_future = 30 * 24 * 3600;
bool protect_timespec(struct timespec *check)
{
struct timespec limit = CURRENT_TIME;
bool res = false;
limit.tv_sec += max_lamport_future;
if (unlikely(check->tv_sec >= limit.tv_sec)) {
down_write(&lamport_sem);
timespec_add_ns(&lamport_stamp, 1);
memcpy(check, &lamport_stamp, sizeof(*check));
if (unlikely(check->tv_sec > limit.tv_sec))
max_lamport_future += check->tv_sec - limit.tv_sec;
up_write(&lamport_sem);
res = true;
}
return res;
}

View File

@ -52,4 +52,10 @@ extern void set_lamport_nonstrict(struct timespec *lamport_old);
*/
extern void set_get_lamport(struct timespec *lamport_old, struct timespec *real_now, struct timespec *lamport_now);
/* Protect against illegal values, e.g. from currupt filesystems etc.
*/
extern int max_lamport_future;
extern bool protect_timespec(struct timespec *check);
#endif

View File

@ -154,6 +154,8 @@ struct mapfree_info *mapfree_get(const char *name, int flags)
for (;;) {
struct address_space *mapping;
struct inode *inode;
loff_t length;
int i;
int ra = 1;
int prot = 0600;
mm_segment_t oldfs;
@ -205,7 +207,12 @@ struct mapfree_info *mapfree_get(const char *name, int flags)
mapping_set_gfp_mask(mapping, mapping_gfp_mask(mapping) & ~(__GFP_IO | __GFP_FS));
mf->mf_max = i_size_read(inode);
length = i_size_read(inode);
mf->mf_max = length;
for (i = 0; i < DIRTY_MAX; i++) {
rwlock_init(&mf->mf_length[i].dl_lock);
mf->mf_length[i].dl_length = length;
}
if (S_ISBLK(inode->i_mode)) {
MARS_INF("changing blkdev readahead from %lu to %d\n", inode->i_bdev->bd_disk->queue->backing_dev_info.ra_pages, ra);
@ -295,70 +302,71 @@ int mapfree_thread(void *data)
return 0;
}
////////////////// dirty IOs in append mode //////////////////
static
struct dirty_length *_get_dl(struct mapfree_info *mf, enum dirty_stage stage)
{
#ifdef MARS_DEBUGGING
if (unlikely(stage < 0)) {
MARS_ERR("bad stage=%d\n", stage);
stage = 0;
}
if (unlikely(stage >= DIRTY_MAX)) {
MARS_ERR("bad stage=%d\n", stage);
stage = DIRTY_MAX - 1;
}
#endif
return &mf->mf_length[stage];
}
void mf_dirty_append(struct mapfree_info *mf, enum dirty_stage stage, loff_t newlen)
{
struct dirty_length *dl = _get_dl(mf, stage);
unsigned long flags;
traced_writelock(&dl->dl_lock, flags);
if (dl->dl_length < newlen)
dl->dl_length = newlen;
traced_writeunlock(&dl->dl_lock, flags);
}
loff_t mf_dirty_length(struct mapfree_info *mf, enum dirty_stage stage)
{
struct dirty_length *dl = _get_dl(mf, stage);
#ifdef CONFIG_64BIT
/* Avoid locking by assuming that 64bit reads are atomic in itself */
smp_read_barrier_depends();
return ACCESS_ONCE(dl->dl_length);
#else /* cannot rely on atomic read of two 32bit values */
loff_t res;
unsigned long flags;
traced_readlock(&dl->dl_lock, flags);
res = dl->dl_length;
traced_readunlock(&dl->dl_lock, flags);
return res;
#endif
}
////////////////// dirty IOs on the fly //////////////////
void mf_insert_dirty(struct mapfree_info *mf, struct dirty_info *di)
{
if (likely(di->dirty_mref && mf)) {
down_write(&mf->mf_mutex);
list_del(&di->dirty_head);
list_add(&di->dirty_head, &mf->mf_dirty_anchor);
up_write(&mf->mf_mutex);
}
}
EXPORT_SYMBOL_GPL(mf_insert_dirty);
void mf_remove_dirty(struct mapfree_info *mf, struct dirty_info *di)
{
if (!list_empty(&di->dirty_head) && mf) {
down_write(&mf->mf_mutex);
list_del_init(&di->dirty_head);
up_write(&mf->mf_mutex);
}
}
EXPORT_SYMBOL_GPL(mf_remove_dirty);
void mf_get_dirty(struct mapfree_info *mf, loff_t *min, loff_t *max, int min_stage, int max_stage)
{
struct list_head *tmp;
if (unlikely(!mf))
goto done;
down_read(&mf->mf_mutex);
for (tmp = mf->mf_dirty_anchor.next; tmp != &mf->mf_dirty_anchor; tmp = tmp->next) {
struct dirty_info *di = container_of(tmp, struct dirty_info, dirty_head);
struct mref_object *mref = di->dirty_mref;
if (unlikely(!mref)) {
continue;
}
if (di->dirty_stage < min_stage || di->dirty_stage > max_stage) {
continue;
}
if (mref->ref_pos < *min) {
*min = mref->ref_pos;
}
if (mref->ref_pos + mref->ref_len > *max) {
*max = mref->ref_pos + mref->ref_len;
}
}
up_read(&mf->mf_mutex);
done:;
}
EXPORT_SYMBOL_GPL(mf_get_dirty);
void mf_get_any_dirty(const char *filename, loff_t *min, loff_t *max, int min_stage, int max_stage)
loff_t mf_get_any_dirty(const char *filename, int stage)
{
loff_t res = -1;
struct list_head *tmp;
down_read(&mapfree_mutex);
for (tmp = mapfree_list.next; tmp != &mapfree_list; tmp = tmp->next) {
struct mapfree_info *mf = container_of(tmp, struct mapfree_info, mf_head);
if (!strcmp(mf->mf_name, filename)) {
mf_get_dirty(mf, min, max, min_stage, max_stage);
res = mf_dirty_length(mf, stage);
break;
}
}
up_read(&mapfree_mutex);
return res;
}
EXPORT_SYMBOL_GPL(mf_get_any_dirty);

View File

@ -47,6 +47,19 @@
extern int mapfree_period_sec;
extern int mapfree_grace_keep_mb;
enum dirty_stage {
DIRTY_SUBMITTED,
DIRTY_COMPLETED,
DIRTY_FINISHED,
/* Keep this the last element */
DIRTY_MAX
};
struct dirty_length {
rwlock_t dl_lock;
loff_t dl_length;
};
struct mapfree_info {
struct list_head mf_head;
struct list_head mf_dirty_anchor;
@ -60,12 +73,7 @@ struct mapfree_info {
loff_t mf_last;
loff_t mf_max;
long long mf_jiffies;
};
struct dirty_info {
struct list_head dirty_head;
struct mref_object *dirty_mref;
int dirty_stage;
struct dirty_length mf_length[DIRTY_MAX];
};
struct mapfree_info *mapfree_get(const char *filename, int flags);
@ -76,12 +84,14 @@ void mapfree_set(struct mapfree_info *mf, loff_t min, loff_t max);
void mapfree_pages(struct mapfree_info *mf, int grace_keep);
////////////////// dirty IOs in append mode //////////////////
void mf_dirty_append(struct mapfree_info *mf, enum dirty_stage stage, loff_t newlen);
loff_t mf_dirty_length(struct mapfree_info *mf, enum dirty_stage stage);
////////////////// dirty IOs on the fly //////////////////
void mf_insert_dirty(struct mapfree_info *mf, struct dirty_info *di);
void mf_remove_dirty(struct mapfree_info *mf, struct dirty_info *di);
void mf_get_dirty(struct mapfree_info *mf, loff_t *min, loff_t *max, int min_stage, int max_stage);
void mf_get_any_dirty(const char *filename, loff_t *min, loff_t *max, int min_stage, int max_stage);
loff_t mf_get_any_dirty(const char *filename, int stage);
////////////////// module init stuff /////////////////////////

View File

@ -95,7 +95,6 @@ EXPORT_SYMBOL_GPL(aio_sync_mode);
static inline
void _enqueue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int prio, bool at_end)
{
unsigned long flags;
#if 1
prio++;
if (unlikely(prio < 0)) {
@ -109,7 +108,7 @@ void _enqueue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int
mref_a->enqueue_stamp = cpu_clock(raw_smp_processor_id());
traced_lock(&tinfo->lock, flags);
mutex_lock(&tinfo->mutex);
if (at_end) {
list_add_tail(&mref_a->io_head, &tinfo->mref_list[prio]);
@ -119,7 +118,7 @@ void _enqueue(struct aio_threadinfo *tinfo, struct aio_mref_aspect *mref_a, int
tinfo->queued[prio]++;
atomic_inc(&tinfo->queued_sum);
traced_unlock(&tinfo->lock, flags);
mutex_unlock(&tinfo->mutex);
atomic_inc(&tinfo->total_enqueue_count);
@ -131,9 +130,8 @@ struct aio_mref_aspect *_dequeue(struct aio_threadinfo *tinfo)
{
struct aio_mref_aspect *mref_a = NULL;
int prio;
unsigned long flags = 0;
traced_lock(&tinfo->lock, flags);
mutex_lock(&tinfo->mutex);
for (prio = 0; prio < MARS_PRIO_NR; prio++) {
struct list_head *start = &tinfo->mref_list[prio];
@ -148,7 +146,7 @@ struct aio_mref_aspect *_dequeue(struct aio_threadinfo *tinfo)
}
done:
traced_unlock(&tinfo->lock, flags);
mutex_unlock(&tinfo->mutex);
if (likely(mref_a && mref_a->object)) {
unsigned long long latency;
@ -163,27 +161,6 @@ done:
static
loff_t get_total_size(struct aio_output *output)
{
struct file *file;
struct inode *inode;
loff_t min;
file = output->mf->mf_filp;
if (unlikely(!file)) {
MARS_ERR("file is not open\n");
return -EILSEQ;
}
if (unlikely(!file->f_mapping)) {
MARS_ERR("file %p has no mapping\n", file);
return -EILSEQ;
}
inode = file->f_mapping->host;
if (unlikely(!inode)) {
MARS_ERR("file %p has no inode\n", file);
return -EILSEQ;
}
min = i_size_read(inode);
/* Workaround for races in the page cache.
* It appears that concurrent reads and writes seem to
* result in inconsistent reads in some very rare cases, due to
@ -191,12 +168,7 @@ loff_t get_total_size(struct aio_output *output)
* appended by a write operation, but the data has not actually hit
* the page cache, such that a concurrent read gets NULL blocks.
*/
if (!output->brick->is_static_device) {
loff_t max = 0;
mf_get_dirty(output->mf, &min, &max, 0, 99);
}
return min;
return mf_dirty_length(output->mf, DIRTY_COMPLETED);
}
static int aio_ref_get(struct aio_output *output, struct mref_object *mref)
@ -300,13 +272,12 @@ void _complete(struct aio_output *output, struct aio_mref_aspect *mref_a, int er
done:
if (mref->ref_rw) {
mf_dirty_append(output->mf, DIRTY_FINISHED, mref->ref_pos + mref->ref_len);
atomic_dec(&output->write_count);
} else {
atomic_dec(&output->read_count);
}
mf_remove_dirty(output->mf, &mref_a->di);
aio_ref_put(output, mref);
atomic_dec(&output->work_count);
atomic_dec(&mars_global_io_flying);
@ -341,7 +312,6 @@ void _complete_all(struct list_head *tmp_list, struct aio_output *output, int er
struct list_head *tmp = tmp_list->next;
struct aio_mref_aspect *mref_a = container_of(tmp, struct aio_mref_aspect, io_head);
list_del_init(tmp);
mref_a->di.dirty_stage = 3;
_complete(output, mref_a, err);
}
}
@ -476,7 +446,7 @@ int aio_start_thread(
INIT_LIST_HEAD(&tinfo->mref_list[j]);
}
tinfo->output = output;
spin_lock_init(&tinfo->lock);
mutex_init(&tinfo->mutex);
init_waitqueue_head(&tinfo->event);
init_waitqueue_head(&tinfo->terminate_event);
tinfo->should_terminate = false;
@ -591,7 +561,6 @@ int aio_sync_thread(void *data)
while (!tinfo->should_terminate || atomic_read(&tinfo->queued_sum) > 0) {
LIST_HEAD(tmp_list);
unsigned long flags;
int i;
output->fdsync_active = false;
@ -602,7 +571,7 @@ int aio_sync_thread(void *data)
atomic_read(&tinfo->queued_sum) > 0,
HZ / 4);
traced_lock(&tinfo->lock, flags);
mutex_lock(&tinfo->mutex);
for (i = 0; i < MARS_PRIO_NR; i++) {
struct list_head *start = &tinfo->mref_list[i];
if (!list_empty(start)) {
@ -613,7 +582,7 @@ int aio_sync_thread(void *data)
break;
}
}
traced_unlock(&tinfo->lock, flags);
mutex_unlock(&tinfo->mutex);
if (!list_empty(&tmp_list)) {
aio_sync_all(output, &tmp_list);
@ -687,12 +656,13 @@ static int aio_event_thread(void *data)
if (!mref_a) {
continue; // this was a dummy request
}
mref_a->di.dirty_stage = 2;
mref = mref_a->object;
MARS_IO("AIO done %p pos = %lld len = %d rw = %d\n", mref, mref->ref_pos, mref->ref_len, mref->ref_rw);
mapfree_set(output->mf, mref->ref_pos, mref->ref_pos + mref->ref_len);
if (mref->ref_rw)
mf_dirty_append(output->mf, DIRTY_COMPLETED, mref->ref_pos + mref->ref_len);
if (output->brick->o_fdsync
&& err >= 0
@ -713,7 +683,6 @@ static int aio_event_thread(void *data)
continue;
}
mref_a->di.dirty_stage = 3;
_complete(output, mref_a, err);
}
@ -915,9 +884,8 @@ static int aio_submit_thread(void *data)
mapfree_set(output->mf, mref->ref_pos, -1);
mref_a->di.dirty_stage = 0;
if (mref->ref_rw) {
mf_insert_dirty(output->mf, &mref_a->di);
mf_dirty_append(output->mf, DIRTY_SUBMITTED, mref->ref_pos + mref->ref_len);
}
mref->ref_total_size = get_total_size(output);
@ -950,13 +918,11 @@ static int aio_submit_thread(void *data)
sleeptime = 1;
for (;;) {
mref_a->di.dirty_stage = 1;
status = aio_submit(output, mref_a, false);
if (likely(status != -EAGAIN)) {
break;
}
mref_a->di.dirty_stage = 0;
atomic_inc(&output->total_delay_count);
brick_msleep(sleeptime);
if (sleeptime < 100) {
@ -1098,15 +1064,12 @@ static int aio_mref_aspect_init_fn(struct generic_aspect *_ini)
{
struct aio_mref_aspect *ini = (void*)_ini;
INIT_LIST_HEAD(&ini->io_head);
INIT_LIST_HEAD(&ini->di.dirty_head);
ini->di.dirty_mref = ini->object;
return 0;
}
static void aio_mref_aspect_exit_fn(struct generic_aspect *_ini)
{
struct aio_mref_aspect *ini = (void*)_ini;
CHECK_HEAD_EMPTY(&ini->di.dirty_head);
CHECK_HEAD_EMPTY(&ini->io_head);
}

View File

@ -49,7 +49,6 @@ extern int aio_sync_mode;
struct aio_mref_aspect {
GENERIC_ASPECT(mref);
struct list_head io_head;
struct dirty_info di;
unsigned long long enqueue_stamp;
long long start_jiffies;
int resubmit;
@ -76,7 +75,7 @@ struct aio_threadinfo {
struct task_struct *thread;
wait_queue_head_t event;
wait_queue_head_t terminate_event;
spinlock_t lock;
struct mutex mutex;
int queued[MARS_PRIO_NR];
atomic_t queued_sum;
atomic_t total_enqueue_count;

View File

@ -59,7 +59,7 @@ void _do_resubmit(struct client_channel *ch)
{
struct client_output *output = ch->output;
spin_lock(&output->lock);
mutex_lock(&output->mutex);
if (!list_empty(&ch->wait_list)) {
struct list_head *first = ch->wait_list.next;
struct list_head *last = ch->wait_list.prev;
@ -69,7 +69,7 @@ void _do_resubmit(struct client_channel *ch)
list_connect(last, old_start);
INIT_LIST_HEAD(&ch->wait_list);
}
spin_unlock(&output->lock);
mutex_unlock(&output->mutex);
}
static
@ -455,17 +455,16 @@ static
void _hash_insert(struct client_output *output, struct client_mref_aspect *mref_a)
{
struct mref_object *mref = mref_a->object;
unsigned long flags;
int hash_index;
traced_lock(&output->lock, flags);
mutex_lock(&output->mutex);
list_del(&mref_a->io_head);
list_add_tail(&mref_a->io_head, &output->mref_list);
list_del(&mref_a->hash_head);
mref->ref_id = ++output->last_id;
hash_index = mref->ref_id % CLIENT_HASH_MAX;
list_add_tail(&mref_a->hash_head, &output->hash_table[hash_index]);
traced_unlock(&output->lock, flags);
mutex_unlock(&output->mutex);
}
static void client_ref_io(struct client_output *output, struct mref_object *mref)
@ -522,7 +521,6 @@ int receiver_thread(void *data)
struct list_head *tmp;
struct client_mref_aspect *mref_a = NULL;
struct mref_object *mref = NULL;
unsigned long flags;
if (ch->recv_error) {
/* The protocol may be out of sync.
@ -563,7 +561,7 @@ int receiver_thread(void *data)
{
int hash_index = cmd.cmd_int1 % CLIENT_HASH_MAX;
traced_lock(&output->lock, flags);
mutex_lock(&output->mutex);
for (tmp = output->hash_table[hash_index].next; tmp != &output->hash_table[hash_index]; tmp = tmp->next) {
struct mref_object *tmp_mref;
mref_a = container_of(tmp, struct client_mref_aspect, hash_head);
@ -577,11 +575,11 @@ int receiver_thread(void *data)
break;
err:
traced_unlock(&output->lock, flags);
mutex_unlock(&output->mutex);
status = -EBADR;
goto done;
}
traced_unlock(&output->lock, flags);
mutex_unlock(&output->mutex);
if (unlikely(!mref)) {
MARS_WRN("got unknown callback id %d on '%s' @%s\n",
@ -671,7 +669,6 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
struct list_head *next;
LIST_HEAD(tmp_list);
long io_timeout = _compute_timeout(brick);
unsigned long flags;
int i;
if (list_empty(anchor))
@ -699,7 +696,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
io_timeout *= HZ;
traced_lock(&output->lock, flags);
mutex_lock(&output->mutex);
for (tmp = anchor->next, next = tmp->next; tmp != anchor; tmp = next, next = tmp->next) {
struct client_mref_aspect *mref_a;
@ -714,7 +711,7 @@ void _do_timeout(struct client_output *output, struct list_head *anchor, int *ro
list_del_init(&mref_a->io_head);
list_add_tail(&mref_a->tmp_head, &tmp_list);
}
traced_unlock(&output->lock, flags);
mutex_unlock(&output->mutex);
while (!list_empty(&tmp_list)) {
struct client_mref_aspect *mref_a;
@ -778,7 +775,6 @@ static int sender_thread(void *data)
bool old_cork;
int ch_skip = max_client_bulk;
int status = -ESHUTDOWN;
unsigned long flags;
if (atomic_inc_return(&sender_count) == 1)
mars_limit_reset(&client_limiter);
@ -830,10 +826,10 @@ static int sender_thread(void *data)
/* Grab the next mref from the queue
*/
traced_lock(&output->lock, flags);
mutex_lock(&output->mutex);
tmp = output->mref_list.next;
if (tmp == &output->mref_list) {
traced_unlock(&output->lock, flags);
mutex_unlock(&output->mutex);
MARS_DBG("empty %d %d\n", output->get_info, brick_thread_should_stop());
do_timeout = true;
continue;
@ -841,7 +837,7 @@ static int sender_thread(void *data)
list_del_init(tmp);
// notice: hash_head remains in its list!
cork = !list_empty(&output->mref_list);
traced_unlock(&output->lock, flags);
mutex_unlock(&output->mutex);
mref_a = container_of(tmp, struct client_mref_aspect, io_head);
mref = mref_a->object;
@ -886,10 +882,10 @@ static int sender_thread(void *data)
ch_skip = max_client_bulk;
}
spin_lock(&output->lock);
mutex_lock(&output->mutex);
list_add(tmp, &ch->wait_list);
// notice: hash_head is already there!
spin_unlock(&output->lock);
mutex_unlock(&output->mutex);
status = mars_send_mref(&ch->socket, mref, cork);
old_cork = cork;
@ -1050,8 +1046,7 @@ static int client_output_construct(struct client_output *output)
}
init_waitqueue_head(&output->bundle.sender_event);
spin_lock_init(&output->lock);
mutex_init(&output->mutex);
INIT_LIST_HEAD(&output->mref_list);
init_waitqueue_head(&output->info_event);
return 0;

View File

@ -92,7 +92,7 @@ struct client_output {
MARS_OUTPUT(client);
atomic_t fly_count;
atomic_t timeout_count;
spinlock_t lock;
struct mutex mutex;
struct list_head mref_list;
int last_id;
struct client_bundle bundle;

View File

@ -402,9 +402,13 @@ void _sio_ref_io(struct sio_threadinfo *tinfo, struct mref_object *mref)
if (mref->ref_rw == READ) {
status = read_aops(output, mref);
} else {
mf_dirty_append(output->mf, DIRTY_SUBMITTED, mref->ref_pos + mref->ref_len);
status = write_aops(output, mref);
if (barrier || output->brick->o_fdsync)
sync_file(output);
if (status >= 0) {
if (barrier || output->brick->o_fdsync)
sync_file(output);
mf_dirty_append(output->mf, DIRTY_COMPLETED, mref->ref_pos + mref->ref_len);
}
}
mapfree_set(output->mf, mref->ref_pos, mref->ref_pos + mref->ref_len);
@ -413,6 +417,8 @@ done:
_complete(output, mref, status);
atomic_dec(&tinfo->fly_count);
if (mref->ref_rw && status >= 0)
mf_dirty_append(output->mf, DIRTY_FINISHED, mref->ref_pos + mref->ref_len);
}
/* This is called from outside

View File

@ -108,7 +108,7 @@ int trans_logger_replay_timeout = 1; // in s
EXPORT_SYMBOL_GPL(trans_logger_replay_timeout);
struct writeback_group global_writeback = {
.lock = __RW_LOCK_UNLOCKED(global_writeback.lock),
.mutex = __RWSEM_INITIALIZER(global_writeback.mutex),
.group_anchor = LIST_HEAD_INIT(global_writeback.group_anchor),
.until_percent = 30,
};
@ -117,22 +117,18 @@ EXPORT_SYMBOL_GPL(global_writeback);
static
void add_to_group(struct writeback_group *gr, struct trans_logger_brick *brick)
{
unsigned long flags;
write_lock_irqsave(&gr->lock, flags);
down_write(&gr->mutex);
list_add_tail(&brick->group_head, &gr->group_anchor);
write_unlock_irqrestore(&gr->lock, flags);
up_write(&gr->mutex);
}
static
void remove_from_group(struct writeback_group *gr, struct trans_logger_brick *brick)
{
unsigned long flags;
write_lock_irqsave(&gr->lock, flags);
down_write(&gr->mutex);
list_del_init(&brick->group_head);
gr->leader = NULL;
write_unlock_irqrestore(&gr->lock, flags);
up_write(&gr->mutex);
}
static
@ -140,7 +136,6 @@ struct trans_logger_brick *elect_leader(struct writeback_group *gr)
{
struct trans_logger_brick *res = gr->leader;
struct list_head *tmp;
unsigned long flags;
if (res && gr->until_percent >= 0) {
loff_t used = atomic64_read(&res->shadow_mem_used);
@ -148,7 +143,8 @@ struct trans_logger_brick *elect_leader(struct writeback_group *gr)
goto done;
}
read_lock_irqsave(&gr->lock, flags);
/* FIXME: use O(log n) data structure instead */
down_read(&gr->mutex);
for (tmp = gr->group_anchor.next; tmp != &gr->group_anchor; tmp = tmp->next) {
struct trans_logger_brick *test = container_of(tmp, struct trans_logger_brick, group_head);
loff_t new_used = atomic64_read(&test->shadow_mem_used);
@ -158,7 +154,7 @@ struct trans_logger_brick *elect_leader(struct writeback_group *gr)
gr->biggest = new_used;
}
}
read_unlock_irqrestore(&gr->lock, flags);
up_read(&gr->mutex);
gr->leader = res;
@ -2712,7 +2708,6 @@ void replay_endio(struct generic_callback *cb)
struct trans_logger_mref_aspect *mref_a = cb->cb_private;
struct trans_logger_brick *brick;
bool ok;
unsigned long flags;
_crashme(22, false);
@ -2726,10 +2721,10 @@ void replay_endio(struct generic_callback *cb)
MARS_ERR("IO error = %d\n", cb->cb_error);
}
traced_lock(&brick->replay_lock, flags);
down_write(&brick->replay_mutex);
ok = !list_empty(&mref_a->replay_head);
list_del_init(&mref_a->replay_head);
traced_unlock(&brick->replay_lock, flags);
up_write(&brick->replay_mutex);
if (likely(ok)) {
atomic_dec(&brick->replay_count);
@ -2749,11 +2744,8 @@ bool _has_conflict(struct trans_logger_brick *brick, struct trans_logger_mref_as
struct mref_object *mref = mref_a->object;
struct list_head *tmp;
bool res = false;
unsigned long flags;
// NOTE: replacing this by rwlock_t will not gain anything, because there exists at most 1 reader at any time
traced_lock(&brick->replay_lock, flags);
down_read(&brick->replay_mutex);
for (tmp = brick->replay_list.next; tmp != &brick->replay_list; tmp = tmp->next) {
struct trans_logger_mref_aspect *tmp_a;
@ -2767,7 +2759,7 @@ bool _has_conflict(struct trans_logger_brick *brick, struct trans_logger_mref_as
}
}
traced_unlock(&brick->replay_lock, flags);
up_read(&brick->replay_mutex);
return res;
}
@ -2778,7 +2770,6 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
int conflicts = 0;
bool ok = false;
bool was_empty;
unsigned long flags;
wait_event_interruptible_timeout(brick->worker_event,
atomic_read(&brick->replay_count) < max
@ -2789,7 +2780,7 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
if (conflicts)
atomic_inc(&brick->total_replay_conflict_count);
traced_lock(&brick->replay_lock, flags);
down_write(&brick->replay_mutex);
was_empty = !!list_empty(&mref_a->replay_head);
if (likely(was_empty)) {
atomic_inc(&brick->replay_count);
@ -2797,7 +2788,7 @@ void wait_replay(struct trans_logger_brick *brick, struct trans_logger_mref_aspe
list_del(&mref_a->replay_head);
}
list_add(&mref_a->replay_head, &brick->replay_list);
traced_unlock(&brick->replay_lock, flags);
up_write(&brick->replay_mutex);
if (unlikely(!was_empty)) {
MARS_ERR("replay_head was already used (ok=%d, conflicts=%d, replay_count=%d)\n", ok, conflicts, atomic_read(&brick->replay_count));
@ -3395,7 +3386,7 @@ int trans_logger_brick_construct(struct trans_logger_brick *brick)
}
atomic_set(&brick->hash_count, 0);
spin_lock_init(&brick->replay_lock);
init_rwsem(&brick->replay_mutex);
INIT_LIST_HEAD(&brick->replay_list);
INIT_LIST_HEAD(&brick->group_head);
init_waitqueue_head(&brick->worker_event);

View File

@ -53,7 +53,7 @@ extern atomic_t global_mshadow_count;
extern atomic64_t global_mshadow_used;
struct writeback_group {
rwlock_t lock;
struct rw_semaphore mutex;
struct trans_logger_brick *leader;
loff_t biggest;
struct list_head group_anchor;
@ -187,7 +187,7 @@ struct trans_logger_brick {
struct trans_logger_hash_anchor **hash_table;
struct list_head group_head;
loff_t old_margin;
spinlock_t replay_lock;
struct rw_semaphore replay_mutex;
struct list_head replay_list;
struct task_struct *thread;
wait_queue_head_t worker_event;

View File

@ -625,6 +625,7 @@ struct mars_rotate {
int relevant_serial;
int replay_code;
int avoid_count;
int old_open_count;
bool has_symlinks;
bool is_attached;
bool res_shutdown;
@ -642,7 +643,7 @@ struct mars_rotate {
bool is_log_damaged;
bool has_emergency;
bool log_is_really_damaged;
spinlock_t inf_lock;
struct mutex inf_mutex;
bool infs_is_dirty[MAX_INFOS];
struct trans_logger_info infs[MAX_INFOS];
struct key_value_pair msgs[sizeof(rot_keys) / sizeof(char*)];
@ -1316,7 +1317,6 @@ void _update_info(struct trans_logger_info *inf)
{
struct mars_rotate *rot = inf->inf_private;
int hash;
unsigned long flags;
if (unlikely(!rot)) {
MARS_ERR("rot is NULL\n");
@ -1343,10 +1343,10 @@ void _update_info(struct trans_logger_info *inf)
}
}
traced_lock(&rot->inf_lock, flags);
mutex_lock(&rot->inf_mutex);
memcpy(&rot->infs[hash], inf, sizeof(struct trans_logger_info));
rot->infs_is_dirty[hash] = true;
traced_unlock(&rot->inf_lock, flags);
mutex_unlock(&rot->inf_mutex);
mars_trigger();
done:;
@ -1358,12 +1358,11 @@ void write_info_links(struct mars_rotate *rot)
struct trans_logger_info inf;
int count = 0;
for (;;) {
unsigned long flags;
int hash = -1;
int min = 0;
int i;
traced_lock(&rot->inf_lock, flags);
mutex_lock(&rot->inf_mutex);
for (i = 0; i < MAX_INFOS; i++) {
if (!rot->infs_is_dirty[i])
continue;
@ -1374,13 +1373,13 @@ void write_info_links(struct mars_rotate *rot)
}
if (hash < 0) {
traced_unlock(&rot->inf_lock, flags);
mutex_unlock(&rot->inf_mutex);
break;
}
rot->infs_is_dirty[hash] = false;
memcpy(&inf, &rot->infs[hash], sizeof(struct trans_logger_info));
traced_unlock(&rot->inf_lock, flags);
mutex_unlock(&rot->inf_mutex);
MARS_DBG("seq = %d min_pos = %lld max_pos = %lld log_pos = %lld is_replaying = %d is_logging = %d\n",
inf.inf_sequence,
@ -1402,7 +1401,8 @@ void write_info_links(struct mars_rotate *rot)
if (count) {
if (inf.inf_min_pos == inf.inf_max_pos)
mars_trigger();
mars_remote_trigger();
if (rot->todo_primary | rot->is_primary | rot->old_is_primary)
mars_remote_trigger();
}
}
@ -1425,7 +1425,8 @@ void _make_new_replaylink(struct mars_rotate *rot, char *new_host, int new_seque
_update_version_link(rot, &inf);
mars_trigger();
mars_remote_trigger();
if (rot->todo_primary | rot->is_primary | rot->old_is_primary)
mars_remote_trigger();
}
static
@ -2309,13 +2310,13 @@ int peer_thread(void *data)
status = 0;
if (peer->to_remote_trigger) {
pause_time = 0;
peer->to_remote_trigger = false;
MARS_DBG("sending notify to peer...\n");
cmd.cmd_code = CMD_NOTIFY;
status = mars_send_struct(&peer->socket, &cmd, mars_cmd_meta, true);
}
if (likely(status >= 0)) {
peer->to_remote_trigger = false;
cmd.cmd_code = CMD_GETENTS;
if ((!peer->do_additional || peer->do_communicate) &&
mars_resource_list) {
@ -3039,7 +3040,7 @@ int make_log_init(void *buf, struct mars_dent *dent)
status = -ENOMEM;
goto done;
}
spin_lock_init(&rot->inf_lock);
mutex_init(&rot->inf_mutex);
fetch_path = path_make("%s/logfile-update", parent_path);
if (unlikely(!fetch_path)) {
MARS_ERR("cannot create fetch_path\n");
@ -4437,6 +4438,11 @@ done:
rot->if_brick && !rot->if_brick->power.led_off;
_show_primary(rot, parent);
if (open_count != rot->old_open_count) {
rot->old_open_count = open_count;
mars_remote_trigger();
}
err:
return status;
}
@ -5094,8 +5100,13 @@ static int check_deleted(void *buf, struct mars_dent *dent)
goto done;
}
if (!strcmp(dent->d_rest, my_id()))
if (!strcmp(dent->d_rest, my_id())) {
global->deleted_my_border = serial;
if (global->deleted_my_border != global->old_deleted_my_border) {
global->old_deleted_my_border = global->deleted_my_border;
mars_remote_trigger();
}
}
/* Compute the minimum of the deletion progress among
* the resource members.
@ -5246,7 +5257,7 @@ static const struct main_class main_classes[] = {
[CL_GLOBAL_USERSPACE_ITEMS] = {
.cl_name = "",
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_type = 'L',
.cl_father = CL_GLOBAL_USERSPACE,
},
@ -5419,7 +5430,7 @@ static const struct main_class main_classes[] = {
[CL_RESOURCE_USERSPACE_ITEMS] = {
.cl_name = "",
.cl_len = 0, // catch any
.cl_type = 'l',
.cl_type = 'L',
.cl_father = CL_RESOURCE_USERSPACE,
},
@ -5824,6 +5835,12 @@ static int main_worker(struct mars_global *global, struct mars_dent *dent, bool
return -EINVAL;
}
break;
case 'L':
if (!S_ISLNK(dent->new_stat.mode)) {
/* ignore silently */
return -EINVAL;
}
break;
}
if (likely(class > CL_ROOT)) {
int father = main_classes[class].cl_father;

View File

@ -62,6 +62,29 @@ const char mars_version_string[] = BUILDTAG " (" BUILDHOST " " BUILDDATE ") "
mars_info_fn mars_info = NULL;
static
void interpret_user_message(char *msg)
{
char cmd = msg[0];
char *rest = msg + 1;
while (*rest == ' ')
rest++;
switch (cmd) {
case 'l': /* write to syslog via say logging */
MARS_INF("%s\n", rest);
break;
case 'L': /* write to syslog via printk */
printk("%s\n", rest);
break;
default:
MARS_DBG("unknown user message '%s'\n", msg);
}
}
static
int trigger_sysctl_handler(
struct ctl_table *table,
@ -75,19 +98,25 @@ int trigger_sysctl_handler(
MARS_DBG("write = %d len = %ld pos = %lld\n", write, len, *ppos);
if (!len || *ppos > 0) {
if (len <= 0 || *ppos > 0) {
goto done;
}
if (write) {
char tmp[8] = {};
int code = 0;
char *tmp = brick_string_alloc(len + 1);
res = len; // fake consumption of all data
if (len > 7)
len = 7;
if (!copy_from_user(tmp, buffer, len)) {
if (copy_from_user(tmp, buffer, len)) {
MARS_ERR("cannot read %ld bytes from trigger\n", len);
goto dealloc;
}
tmp[len] = '\0';
if (tmp[0] == ' ' ||
(tmp[0] >= '0' && tmp[0] <= '9')) {
int code = 0;
sscanf(tmp, "%d", &code);
if (code > 0) {
mars_trigger();
@ -97,7 +126,11 @@ int trigger_sysctl_handler(
} else if (code > 1) {
mars_remote_trigger();
}
} else {
interpret_user_message(tmp);
}
dealloc:
brick_string_free(tmp);
} else {
char *answer = "MARS module not operational\n";
char *tmp = NULL;
@ -292,6 +325,7 @@ struct ctl_table mars_table[] = {
.mode = 0400,
.proc_handler = &lamport_sysctl_handler,
},
INT_ENTRY("max_lamport_future", max_lamport_future, 0600),
INT_ENTRY("show_log_messages", brick_say_logging, 0600),
INT_ENTRY("show_debug_messages", brick_say_debug, 0600),
INT_ENTRY("show_statistics_global", global_show_statist, 0600),

View File

@ -111,6 +111,7 @@ struct mars_global {
wait_queue_head_t main_event;
int global_version;
int deleted_my_border;
int old_deleted_my_border;
int deleted_border;
int deleted_min;
bool main_trigger;

View File

@ -1039,6 +1039,16 @@ int get_inode(char *newpath, struct mars_dent *dent)
goto done;
}
/* Correct illegal timestamps */
if (unlikely(protect_timespec(&tmp.mtime)) &&
S_ISLNK(dent->new_stat.mode)) {
char *val = mars_readlink(newpath);
if (val) {
mars_symlink(val, newpath, &tmp.mtime, 0);
brick_string_free(val);
}
}
memcpy(&dent->old_stat, &dent->new_stat, sizeof(dent->old_stat));
memcpy(&dent->new_stat, &tmp, sizeof(dent->new_stat));
@ -1079,19 +1089,18 @@ int get_inode(char *newpath, struct mars_dent *dent)
}
path_put(&path);
} else if (S_ISREG(dent->new_stat.mode) && dent->d_name && !strncmp(dent->d_name, "log-", 4)) {
loff_t min = dent->new_stat.size;
loff_t max = 0;
loff_t min;
dent->d_corr_A = 0;
dent->d_corr_B = 0;
mf_get_any_dirty(newpath, &min, &max, 0, 2);
min = mf_get_any_dirty(newpath, DIRTY_COMPLETED);
if (min < dent->new_stat.size) {
MARS_DBG("file '%s' A size=%lld min=%lld max=%lld\n", newpath, dent->new_stat.size, min, max);
MARS_DBG("file '%s' A size=%lld min=%lld\n", newpath, dent->new_stat.size, min);
dent->d_corr_A = min;
}
mf_get_any_dirty(newpath, &min, &max, 0, 3);
min = mf_get_any_dirty(newpath, DIRTY_FINISHED);
if (min < dent->new_stat.size) {
MARS_DBG("file '%s' B size=%lld min=%lld max=%lld\n", newpath, dent->new_stat.size, min, max);
MARS_DBG("file '%s' B size=%lld min=%lld\n", newpath, dent->new_stat.size, min);
dent->d_corr_B = min;
}
}

View File

@ -32,7 +32,7 @@ umask 0077;
# global defaults
my $threshold = 10 * 1024 * 1024;
my $window = 30;
my $window = 60;
my $verbose = 0;
my $max_deletions = 512;
my $thresh_logfiles = 10;
@ -40,7 +40,13 @@ my $thresh_logsize = 5; # GB
my $dry_run = 0;
my @MARS_PATH = $ENV{MARS_PATH} ?
split(/:/, $ENV{MARS_PATH}) :
(".", "~/.marsadm", "/etc/marsadm", "/usr/lib/marsadm", "/usr/local/lib/marsadm");
(
".",
"$ENV{HOME}/.marsadm",
"/etc/marsadm",
"/usr/lib/marsadm",
"/usr/local/lib/marsadm",
);
##################################################################
@ -288,7 +294,7 @@ sub get_link_stamp {
sub is_link_recent {
my ($path, $wind) = @_;
$wind = $window unless defined($wind);
$wind = $window * 2 unless defined($wind);
my @stat = lstat($path);
return 0 if (!@stat);
return 1 if $stat[9] + $wind >= mars_time();
@ -323,6 +329,8 @@ sub finish_links {
return unless @link_list;
my $timestamp = mars_time();
lprint "using lamport timestamp $timestamp\n" if $verbose;
my $trigger_code = 1;
my $count = 0;
while (my $link = shift @link_list) {
my $link_tmp = to_tmp($link);
my $target = readlink($link_tmp);
@ -342,8 +350,10 @@ sub finish_links {
if ($verbose) {
lprint "created symlink '$link' -> '$target'\n";
}
$count++;
$trigger_code = 2 if $link =~ m:/(primary|todo-global|ip):;
}
_trigger();
_trigger($trigger_code) if $count > 0;
}
##################################################################
@ -527,9 +537,8 @@ sub wait_cluster {
}
last;
}
_trigger($hosts =~ m/\*/ ? 3 : 2);
sleep_timeout(5, !$abort);
last if $timeout <= 0;
sleep_timeout();
last if $timeout <= 0 && !$unknown_count;
}
$timeout = $old_timeout;
}
@ -679,7 +688,7 @@ sub check_sync_finished {
}
sub check_primary {
my ($cmd, $res) = @_;
my ($cmd, $res, $no_designated) = @_;
my $lnk = "$mars/resource-$res/actual-$host/is-primary";
my $is_primary = get_link($lnk, 1);
if (!$is_primary) { # give it a second chance
@ -688,6 +697,7 @@ sub check_primary {
$is_primary = 1 if -b $dev;
}
ldie "for operation '$cmd' I need to be primary\n" unless $is_primary;
return if $no_designated;
my $primary = _get_designated_primary($res);
ldie "for operation '$cmd', I also must be the designated primary\n" unless $primary eq $host;
}
@ -1034,12 +1044,24 @@ sub _get_prev_pos {
my ($basedir, $nr, $peer) = @_;
my $path = sprintf("version-%09d-$peer", $nr);
my $vers = get_link("$basedir/$path", 2);
if (!defined($vers) || !$vers) {
# Scarce race.
# This can happen when new logfiles are present but not yet worked on.
# Also improves robustness on damaged filesystems.
# Decrement nr by hand, try, take the first part.
$path = sprintf("version-%09d-$peer", $nr - 1);
$vers = get_link("$basedir/$path", 2);
$vers =~ s/:.*// if defined($vers) && $vers;
} else {
# take the last part, pointing to the predecessor versionlink.
$vers =~ s/^.*://;
}
_parse_pos($basedir, $path) if defined($vers) && $vers;
$vers =~ s/^.*://;
return $vers;
}
sub _get_common_ancestor {
my ($total1, $total2) = (0, 0);
for (;;) {
my ($basedir, $pos1, $host1, $dep1, $pos2, $host2, $dep2) = @_;
my ($p1, $nr1, $from1, $len1) = _parse_pos($basedir, $pos1);
@ -1051,37 +1073,43 @@ sub _get_common_ancestor {
my $path1 = sprintf("$basedir/version-%09d-$from1", $nr1);
my $path2 = sprintf("$basedir/version-%09d-$from2", $nr2);
if (my $vers1 = get_link($path1, 1) and my $vers2 = get_link($path2, 1)) {
$split = 1 if $vers1 ne $vers2;
if ($vers1 ne $vers2) {
$split = 1;
$total1 += $len1;
$total2 += $len2;
}
}
}
return ($p1, $split);
return ($p1, $split, $total1, $total2);
} elsif ($nr1 > $nr2) {
# just flip arguments
@_ = ($basedir, $pos2, $host2, $dep2, $pos1, $host1, $dep1);
($total1, $total2) = ($total2, $total1);
next;
} elsif ($nr1 < $nr2) {
# recursively advance path depth
$total2 += $len2;
my $vers2 = _get_prev_pos($basedir, $nr2, $host2);
return ("", -1) if !$vers2;
return ("", -1, $total1, $total2) if !$vers2;
@_ = ($basedir, $pos1, $host1, $dep1, $vers2, $host2, $dep2 + 1);
next;
} elsif ($from1 ne $from2) {
# split brain is sure now, but continue computing the common split point
my $vers1 = _get_prev_pos($basedir, $nr1, $host1);
return ("", 1) if !$vers1;
return ("", 1, $total1 + $len1, $total2 + $len2) if !$vers1;
my $vers2 = _get_prev_pos($basedir, $nr2, $host2);
return ("", 1) if !$vers2;
my ($res, $split) = _get_common_ancestor($basedir, $vers1, $host1, $dep1 + 1, $vers2, $host2, $dep2 + 1);
return ($res, 1);
return ("", 1, $total1 + $len1, $total2 + $len2) if !$vers2;
my ($res, $split, $t1, $t2) = _get_common_ancestor($basedir, $vers1, $host1, $dep1 + 1, $vers2, $host2, $dep2 + 1);
return ($res, 1, $t1 + $total1 + $len1, $t2 + $total2 + $len2);
} elsif ($len1 < $len2) {
# there may be no split brain (just incomplete replay) depending on path depth
return ($p1, $dep1);
return ($p1, $dep1, $total1 + $len1, $total2 + $len2);
} elsif ($len2 < $len1) {
# dto symmetric
return ($p2, $dep2);
return ($p2, $dep2, $total1 + $len1, $total2 + $len2);
}
lwarn "error in algorithm: $p1, $nr1, $from1, $len1 : $p2, $nr2, $from2, $len2\n";
return ("", -1);
return ("", -1, 0, 0);
}
}
@ -1105,11 +1133,25 @@ sub detect_splitbrain {
foreach my $host1 (@hosts) {
foreach my $host2 (@hosts) {
next if $host1 ge $host2;
my ($res, $split) = get_common_ancestor($basedir, $host1, $host2);
my ($point, $split, $size1, $size2) = get_common_ancestor($basedir, $host1, $host2);
if ($split) {
$ok = 0;
if ($do_report) {
lwarn "SPLIT BRAIN at '$res' detected: hostA = '$host1', hostB = '$host2'\n";
my $age = "";
if ($point) {
my $log = "$basedir/$point";
$log =~ s:,.+::;
my $stamp = get_link_stamp($log);
my $vers = $log;
$vers =~ s:/log-:/version-:;
my $stamp2 = get_link_stamp($vers);
# take the minimum
$stamp = $stamp2 if !$stamp || ($stamp2 && $stamp2 < $stamp);
$age = " age ~" . seconds2human(mars_time() - $stamp) if $stamp;
}
lwarn "SPLIT BRAIN of '$res' at '$point'$age\n";
lwarn " hostA = '$host1' data_size='$size1' (" . number2human($size1) . ")\n";
lwarn " hostB = '$host2' data_size='$size2' (" . number2human($size2) . ")\n";
} else {
return $ok;
}
@ -1285,7 +1327,7 @@ sub log_purge_res {
sub try_to_avoid_splitbrain {
my ($cmd, $res, $old_primary) = @_;
my $old_timeout = $timeout;
$timeout = $window * 2 if $timeout < 0;
$timeout = $window if $timeout < 0;
$old_primary = "" if $old_primary eq "(none)";
wait_cluster($cmd, $res, $old_primary);
if (!detect_splitbrain($res, 0)) {
@ -1300,7 +1342,7 @@ sub try_to_avoid_splitbrain {
my @host_list = glob("$mars/resource-$res/replay-*");
$timeout = $old_timeout;
return if scalar(@host_list) < 2;
$timeout = $window * 2 if $timeout < 0;
$timeout = $window if $timeout < 0;
my $old_situation = "";
for (;;) {
my ($min, $max) = get_minmax_versions($res);
@ -1363,7 +1405,7 @@ sub try_to_avoid_splitbrain {
my $tpl = get_macro("replinfo");
my $new_situation = eval_macro($cmd, $res, $tpl, @_);
print $new_situation;
$timeout = $window * 2 if $new_situation ne $old_situation;
$timeout = $window if $new_situation ne $old_situation;
sleep_timeout();
$old_situation = $new_situation;
}
@ -1524,7 +1566,7 @@ sub get_error_text {
# helpers
sub _trigger{
my ($code) = shift || 2;
my $code = shift || 1;
system("(echo $code > /proc/sys/mars/trigger) >/dev/null 2>&1");
}
@ -2315,7 +2357,7 @@ sub delete_res {
sub logrotate_res {
my ($cmd, $res) = @_;
check_primary(@_) unless $force;
check_primary($cmd, $res, 1) unless $force;
my @paths = glob("$mars/resource-$res/log-*-$host");
my $last;
if (@paths) {
@ -2522,8 +2564,15 @@ sub logdelete_res {
_create_delete($first);
}
my @versionlinks = glob("$mars/resource-$res/version-*");
# dont remove versionlinks during split-brain as long as possible
if (scalar(@versionlinks) < $max_deletions / 8 &&
!detect_splitbrain($res, 1)) {
lwarn "SPLIT BRAIN: keep some versionlinks for better reporting\n";
return unless $force;
}
lprint "removing left-over version symlinks...\n" if $verbose;
foreach my $versionlink (glob("$mars/resource-$res/version-*")) {
foreach my $versionlink (@versionlinks) {
my $nrv = $versionlink;
$nrv =~ s/^.*\/version-([0-9]+)-.+$/$1/;
# we need at least one more version link than logfiles for consistency checks
@ -3123,38 +3172,57 @@ sub seconds2human {
}
sub number2human {
my ($unit, $number) = @_;
my $k = 1024;
my $use_float = ($number =~ m/\./);
$k = 1024.0 if $use_float;
my ($number, $unit, $max) = @_;
$max = $number if !defined($max);
my $k = 1024.0;
if (!defined($unit) || !$unit) {
if ($max >= $k * $k * $k * $k) {
$unit = "T";
} elsif ($max >= $k * $k * $k) {
$unit = "G";
} elsif ($max >= $k * $k) {
$unit = "M";
} elsif ($max >= $k) {
$unit = "K";
} else {
$unit = "";
}
}
my $i = "i";
if ($unit =~ m/^[a-z]/) {
$i = "";
$k = 1000.0;
}
$_ = $unit;
SWITCH: {
if (/t/i) {
$number /= $k * $k * $k * $k;
$unit = "TiB";
$unit = "T${i}B";
last SWITCH;
}
if (/g/i) {
$number /= $k * $k * $k;
$unit = "GiB";
$unit = "G${i}B";
last SWITCH;
}
if (/m/i) {
$number /= $k * $k;
$unit = "MiB";
$unit = "M${i}B";
last SWITCH;
}
if (/k/i) {
$number /= $k;
$unit = "KiB";
$unit = "K${i}B";
last SWITCH;
}
$unit = "B";
}
if ($use_float || ($number =~ m/\./)) {
if ($unit eq "B") {
$number = int($number + 0.1);
} else {
$number = sprintf("%.3f", $number);
}
return ($unit, $number);
return "$number $unit";
}
sub progress_bar {
@ -3433,23 +3501,12 @@ sub eval_fn {
$max = $number if $number > $max;
}
lwarn "macro $name: no number arguments given\n" unless @list;
if (!$unit) {
if ($max >= 999 * 1024*1024*1024*1024) {
$unit = "T";
} elsif ($max >= 999 * 1024*1024*1024) {
$unit = "G";
} elsif ($max >= 99 * 1024*1024) {
$unit = "M";
} elsif ($max >= 9 * 1024) {
$unit = "K";
} else {
$unit = "";
}
}
my @results = ();
my $conv_unit = "";
my @results = ();
foreach my $number (@list) {
($conv_unit, my $conv_number) = number2human($unit, $number);
my $conv_number = number2human($number, $unit, $max);
$conv_number =~ s/ *([a-z].*)//i;
$conv_unit = $1;
push @results, $conv_number;
}
return join($delim_numbers, @results) . "$delim_unit$conv_unit";